You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 11:28:56 UTC

[pulsar] branch master updated: PIP-61: Advertise multiple addresses (#6903)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e40bd9  PIP-61: Advertise multiple addresses (#6903)
8e40bd9 is described below

commit 8e40bd977fc85eb2d4f54146f2876ee85b17fdfd
Author: luceneReader <54...@qq.com>
AuthorDate: Wed Jun 3 19:28:45 2020 +0800

    PIP-61: Advertise multiple addresses (#6903)
    
    * PIP-61:
    1. resolve broker.conf, validate `advertisedListeners` and `internalListenerName`
    2. register the `advertisedListeners` to zookeeper
    3. client find the target broker with listenerName
    4. add test case PulsarMultiListenersTest
    5. add test case MultipleListenerValidatorTest
---
 conf/broker.conf                                   |  12 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  14 +++
 .../validator/MultipleListenerValidator.java       | 137 +++++++++++++++++++++
 .../validator/MultipleListenerValidatorTest.java   | 100 +++++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    |  20 ++-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   4 +-
 .../impl/ModularLoadManagerWrapper.java            |   1 -
 .../apache/pulsar/broker/lookup/LookupResult.java  |  13 +-
 .../pulsar/broker/lookup/TopicLookupBase.java      |  71 +++++++----
 .../broker/namespace/NamespaceEphemeralData.java   |  27 +++-
 .../pulsar/broker/namespace/NamespaceService.java  |  69 +++++++++--
 .../pulsar/broker/namespace/OwnershipCache.java    |   6 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   3 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   5 +-
 ...MultiListenersWithInternalListenerNameTest.java | 108 ++++++++++++++++
 ...tiListenersWithoutInternalListenerNameTest.java | 107 ++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |   8 ++
 pulsar-client-messagecrypto-bc/pom.xml             |   1 +
 .../client/impl/BinaryProtoLookupService.java      |   9 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      |   9 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |   5 +-
 .../client/impl/conf/ClientConfigurationData.java  |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  94 ++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   8 ++
 .../data/loadbalancer/AdvertisedListener.java      |  47 +++++++
 .../data/loadbalancer/LocalBrokerData.java         |  17 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +
 27 files changed, 855 insertions(+), 44 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2a6ef62..8d50aeb 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -43,6 +43,18 @@ bindAddress=0.0.0.0
 # Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
+# Used to specify multiple advertised listeners for the broker.
+# The value must format as <listener_name>:pulsar://<host>:<port>,
+# multiple listeners should separate with commas.
+# Do not use this configuration with advertisedAddress and brokerServicePort.
+# The Default value is absent means use advertisedAddress and brokerServicePort.
+# advertisedListeners=
+
+# Used to specify the internal listener name for the broker.
+# The listener name must contain in the advertisedListeners.
+# The Default value is absent, the broker uses the first listener as the internal listener.
+# internalListenerName=
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6238ff6..f90d9d9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -149,6 +149,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String advertisedAddress;
 
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Used to specify multiple advertised listeners for the broker."
+                    + " The value must format as <listener_name>:pulsar://<host>:<port>,"
+                    + "multiple listeners should separate with commas."
+                    + "Do not use this configuration with advertisedAddress and brokerServicePort."
+                    + "The Default value is absent means use advertisedAddress and brokerServicePort.")
+    private String advertisedListeners;
+
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Used to specify the internal listener name for the broker."
+                    + "The listener name must contain in the advertisedListeners."
+                    + "The Default value is absent, the broker uses the first listener as the internal listener.")
+    private String internalListenerName;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Number of threads to use for Netty IO."
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
new file mode 100644
index 0000000..9f44da8
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.validator;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * the validator for pulsar multiple  listeners.
+ */
+public final class MultipleListenerValidator {
+
+    /**
+     * validate the configure of `advertisedListeners`, `internalListenerName`, `advertisedAddress`.
+     * 1. `advertisedListeners` and `advertisedAddress` must not appear together.
+     * 2. the listener name in `advertisedListeners` must not duplicate.
+     * 3. user can not assign same 'host:port' to different listener.
+     * 4. if `internalListenerName` is absent, the first `listener` in the `advertisedListeners` will be the `internalListenerName`.
+     * 5. if pulsar do not specify `brokerServicePortTls`, should only contain one entry of `pulsar://` per listener name.
+     * @param config the pulsar broker configure.
+     * @return
+     */
+    public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListener(ServiceConfiguration config) {
+        if (StringUtils.isNotBlank(config.getAdvertisedListeners()) && StringUtils.isNotBlank(config.getAdvertisedAddress())) {
+            throw new IllegalArgumentException("`advertisedListeners` and `advertisedAddress` must not appear together");
+        }
+        if (StringUtils.isBlank(config.getAdvertisedListeners())) {
+            return Collections.EMPTY_MAP;
+        }
+        Optional<String> firstListenerName = Optional.empty();
+        Map<String, List<String>> listeners = Maps.newHashMap();
+        for (final String str : StringUtils.split(config.getAdvertisedListeners(), ",")) {
+            int index = str.indexOf(":");
+            if (index <= 0) {
+                throw new IllegalArgumentException("the configure entry `advertisedListeners` is invalid. because " +
+                        str + " do not contain listener name");
+            }
+            String listenerName = StringUtils.trim(str.substring(0, index));
+            if (!firstListenerName.isPresent()) {
+                firstListenerName = Optional.of(listenerName);
+            }
+            String value = StringUtils.trim(str.substring(index + 1));
+            listeners.computeIfAbsent(listenerName, k -> Lists.newArrayListWithCapacity(2));
+            listeners.get(listenerName).add(value);
+        }
+        if (StringUtils.isBlank(config.getInternalListenerName())) {
+            config.setInternalListenerName(firstListenerName.get());
+        }
+        if (!listeners.containsKey(config.getInternalListenerName())) {
+            throw new IllegalArgumentException("the `advertisedListeners` configure do not contain `internalListenerName` entry");
+        }
+        final Map<String, AdvertisedListener> result = Maps.newHashMap();
+        final Map<String, Set<String>> reverseMappings = Maps.newHashMap();
+        for (final Map.Entry<String, List<String>> entry : listeners.entrySet()) {
+            if (entry.getValue().size() > 2) {
+                throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+            }
+            URI pulsarAddress = null, pulsarSslAddress = null;
+            for (final String strUri : entry.getValue()) {
+                try {
+                    URI uri = URI.create(strUri);
+                    if (StringUtils.equalsIgnoreCase(uri.getScheme(), "pulsar")) {
+                        if (pulsarAddress == null) {
+                            pulsarAddress = uri;
+                        } else {
+                            throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+                        }
+                    } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "pulsar+ssl")) {
+                        if (pulsarSslAddress == null) {
+                            pulsarSslAddress = uri;
+                        } else {
+                            throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+                        }
+                    }
+                    String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
+                    reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
+                    Set<String> sets = reverseMappings.get(hostPort);
+                    if (sets == null) {
+                        sets = Sets.newTreeSet();
+                        reverseMappings.put(hostPort, sets);
+                    }
+                    sets.add(entry.getKey());
+                    if (sets.size() > 1) {
+                        throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
+                    }
+                } catch (Throwable cause) {
+                    throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
+                }
+            }
+            if (!config.getBrokerServicePortTls().isPresent()) {
+                if (pulsarSslAddress != null) {
+                    throw new IllegalArgumentException("If pulsar do not start ssl port, there is no need to configure " +
+                            " `pulsar+ssl` in `" + entry.getKey() + "` listener.");
+                }
+            } else {
+                if (pulsarSslAddress == null) {
+                    throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+                            + " do not specify `pulsar+ssl` address.");
+                }
+            }
+            if (pulsarAddress == null) {
+                throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+                        + " do not specify `pulsar` address.");
+            }
+            result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
+        }
+        return result;
+    }
+
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
new file mode 100644
index 0000000..89b7217
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.validator;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+/**
+ * testcase for MultipleListenerValidator.
+ */
+public class MultipleListenerValidatorTest {
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testAppearTogether() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setAdvertisedAddress("127.0.0.1");
+        config.setAdvertisedListeners("internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testListenerDuplicate_1() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651,"
+                + " internal:pulsar://192.168.1.11:6660, internal:pulsar+ssl://192.168.1.11:6651");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testListenerDuplicate_2() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " internal:pulsar://192.168.1.11:6660");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testDifferentListenerWithSameHostPort() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " external:pulsar://127.0.0.1:6660");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testListenerWithoutTLSPort() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test
+    public void testListenerWithTLSPort() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerServicePortTls(Optional.of(6651));
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testListenerWithoutNonTLSAddress() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerServicePortTls(Optional.of(6651));
+        config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6651");
+        config.setInternalListenerName("internal");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testWithoutListenerNameInAdvertisedListeners() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerServicePortTls(Optional.of(6651));
+        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+        config.setInternalListenerName("external");
+        MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 35b6a18..689b0a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -92,6 +93,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -117,6 +119,7 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -205,6 +208,8 @@ public class PulsarService implements AutoCloseable {
 
     private final ReentrantLock mutex = new ReentrantLock();
     private final Condition isClosedCondition = mutex.newCondition();
+    // key is listener name , value is pulsar address and pulsar ssl address
+    private Map<String, AdvertisedListener> advertisedListeners;
 
     public PulsarService(ServiceConfiguration config) {
         this(config, Optional.empty(), (exitCode) -> {
@@ -217,10 +222,21 @@ public class PulsarService implements AutoCloseable {
                          Consumer<Integer> processTerminator) {
         // Validate correctness of configuration
         PulsarConfigurationLoader.isComplete(config);
-
+        // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
+        Map<String, AdvertisedListener> result = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        if (result != null) {
+            this.advertisedListeners = Collections.unmodifiableMap(result);
+        } else {
+            this.advertisedListeners = Collections.unmodifiableMap(Collections.emptyMap());
+        }
         state = State.Init;
+        // use `internalListenerName` listener as `advertisedAddress`
         this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
-        this.advertisedAddress = advertisedAddress(config);
+        if (!this.advertisedListeners.isEmpty()) {
+            this.advertisedAddress = this.advertisedListeners.get(config.getInternalListenerName()).getBrokerServiceUrl().getHost();
+        } else {
+            this.advertisedAddress = advertisedAddress(config);
+        }
         this.brokerVersion = PulsarVersion.getVersion();
         this.config = config;
         this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c59e61c..ba5f3ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -852,14 +852,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
 
             lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                    pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+                    pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             lastData.setProtocols(protocolData);
             // configure broker-topic mode
             lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
             lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
 
             localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                    pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+                    pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             localData.setProtocols(protocolData);
             localData.setBrokerVersionString(pulsar.getBrokerVersion());
             // configure broker-topic mode
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 701a217..3a4a7a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index 965db1c..c84cb11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.lookup.data.LookupData;
  * to redirect the client to try again.
  */
 public class LookupResult {
-    enum Type {
+    public enum Type {
         BrokerUrl, RedirectUrl
     }
 
@@ -49,6 +49,17 @@ public class LookupResult {
         this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
     }
 
+    public LookupResult(String httpUrl, String httpUrlTls, String nativeUrl, String nativeUrlTls, Type type) {
+        this.type = type;
+        this.lookupData = new LookupData(nativeUrl, nativeUrlTls, httpUrl, httpUrlTls);
+    }
+
+    public LookupResult(NamespaceEphemeralData namespaceEphemeralData, String nativeUrl, String nativeUrlTls) {
+        this.type = Type.BrokerUrl;
+        this.lookupData = new LookupData(nativeUrl, nativeUrlTls, namespaceEphemeralData.getHttpUrl(),
+                namespaceEphemeralData.getHttpUrlTls());
+    }
+
     public boolean isBrokerUrl() {
         return type == Type.BrokerUrl;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bac10d4..9738e20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -141,11 +141,13 @@ public class TopicLookupBase extends PulsarWebResource {
      *
      * Lookup broker-service address for a given namespace-bundle which contains given topic.
      *
-     * a. Returns broker-address if namespace-bundle is already owned by any broker b. If current-broker receives
-     * lookup-request and if it's not a leader then current broker redirects request to leader by returning
-     * leader-service address. c. If current-broker is leader then it finds out least-loaded broker to own namespace
-     * bundle and redirects request by returning least-loaded broker. d. If current-broker receives request to own the
-     * namespace-bundle then it owns a bundle and returns success(connect) response to client.
+     * a. Returns broker-address if namespace-bundle is already owned by any broker
+     * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
+     *    to leader by returning leader-service address.
+     * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request
+     *    by returning least-loaded broker.
+     * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect)
+     *    response to client.
      *
      * @param pulsarService
      * @param topicName
@@ -156,6 +158,33 @@ public class TopicLookupBase extends PulsarWebResource {
      */
     public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
             boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
+        return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null);
+    }
+
+    /**
+     *
+     * Lookup broker-service address for a given namespace-bundle which contains given topic.
+     *
+     * a. Returns broker-address if namespace-bundle is already owned by any broker
+     * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
+     *    to leader by returning leader-service address.
+     * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request
+     *    by returning least-loaded broker.
+     * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect)
+     *    response to client.
+     *
+     * @param pulsarService
+     * @param topicName
+     * @param authoritative
+     * @param clientAppId
+     * @param requestId
+     * @param advertisedListenerName
+     * @return
+     */
+    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
+                                                              boolean authoritative, String clientAppId,
+                                                              AuthenticationDataSource authenticationData, long requestId,
+                                                              final String advertisedListenerName) {
 
         final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
         final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -207,10 +236,10 @@ public class TopicLookupBase extends PulsarWebResource {
                                     false));
 
                         }).exceptionally(ex -> {
-                            validationFuture.complete(
-                                    newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
-                            return null;
-                        });
+                    validationFuture.complete(
+                            newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
+                    return null;
+                });
             }
         }).exceptionally(ex -> {
             validationFuture.completeExceptionally(ex);
@@ -222,7 +251,7 @@ public class TopicLookupBase extends PulsarWebResource {
             if (validationFailureResponse != null) {
                 lookupfuture.complete(validationFailureResponse);
             } else {
-                pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative)
+                pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative, advertisedListenerName)
                         .thenAccept(lookupResult -> {
 
                             if (log.isDebugEnabled()) {
@@ -252,17 +281,17 @@ public class TopicLookupBase extends PulsarWebResource {
                                         requestId, redirectThroughServiceUrl));
                             }
                         }).exceptionally(ex -> {
-                            if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
-                                log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
-                                        topicName.toString(), ex.getCause().getMessage());
-                            } else {
-                                log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
-                                        topicName.toString(), ex.getMessage(), ex);
-                            }
-                            lookupfuture.complete(
-                                    newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
-                            return null;
-                        });
+                    if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
+                        log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
+                                topicName.toString(), ex.getCause().getMessage());
+                    } else {
+                        log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
+                                topicName.toString(), ex.getMessage(), ex);
+                    }
+                    lookupfuture.complete(
+                            newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
+                    return null;
+                });
             }
 
         }).exceptionally(ex -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
index 6716d6f..30e3567 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
@@ -19,6 +19,12 @@
 package org.apache.pulsar.broker.namespace;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.Maps;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+
+import javax.validation.constraints.NotNull;
+import java.util.Collections;
+import java.util.Map;
 
 public class NamespaceEphemeralData {
     private String nativeUrl;
@@ -26,17 +32,28 @@ public class NamespaceEphemeralData {
     private String httpUrl;
     private String httpUrlTls;
     private boolean disabled;
+    private Map<String, AdvertisedListener> advertisedListeners;
 
     public NamespaceEphemeralData() {
     }
 
     public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
             boolean disabled) {
+        this(brokerUrl, brokerUrlTls, httpUrl, httpUrlTls, disabled, null);
+    }
+
+    public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
+                                  boolean disabled, Map<String, AdvertisedListener> advertisedListeners) {
         this.nativeUrl = brokerUrl;
         this.nativeUrlTls = brokerUrlTls;
         this.httpUrl = httpUrl;
         this.httpUrlTls = httpUrlTls;
         this.disabled = disabled;
+        if (advertisedListeners == null) {
+            this.advertisedListeners = Collections.EMPTY_MAP;
+        } else {
+            this.advertisedListeners = Maps.newHashMap(advertisedListeners);
+        }
     }
 
     public String getNativeUrl() {
@@ -63,9 +80,17 @@ public class NamespaceEphemeralData {
         this.disabled = flag;
     }
 
+    @NotNull
+    public Map<String, AdvertisedListener> getAdvertisedListeners() {
+        if (this.advertisedListeners == null) {
+            return Collections.EMPTY_MAP;
+        }
+        return Collections.unmodifiableMap(this.advertisedListeners);
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("nativeUrl", nativeUrl).add("httpUrl", httpUrl)
-                .add("disabled", disabled).toString();
+                .add("disabled", disabled).add("advertisedListeners", getAdvertisedListeners()).toString();
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 74d69c4..943761e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
@@ -113,7 +114,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
  */
 public class NamespaceService {
 
-    public enum AddressType {
+    public static enum AddressType {
         BROKER_URL, LOOKUP_URL
     }
 
@@ -172,8 +173,13 @@ public class NamespaceService {
 
     public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
             boolean authoritative) {
+        return getBrokerServiceUrlAsync(topic, authoritative, null);
+    }
+
+    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
+                                                                              final String advertisedListenerName) {
         return getBundleAsync(topic)
-                .thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */));
+                .thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */, advertisedListenerName));
     }
 
     public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
@@ -322,16 +328,30 @@ public class NamespaceService {
         = new ConcurrentOpenHashMap<>();
 
     /**
+     * Main internal method to lookup and setup ownership of service unit to a broker.
+     *
+     * @param bundle
+     * @param authoritative
+     * @param readOnly
+     * @return
+     */
+    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
+                                                                           boolean readOnly) {
+        return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
+    }
+
+    /**
      * Main internal method to lookup and setup ownership of service unit to a broker
      *
      * @param bundle
      * @param authoritative
      * @param readOnly
+     * @param advertisedListenerName
      * @return
      * @throws PulsarServerException
      */
     private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
-            boolean readOnly) {
+            boolean readOnly, final String advertisedListenerName) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
         }
@@ -357,7 +377,7 @@ public class NamespaceService {
                     } else {
                         // Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
                         pulsar.getExecutor().execute(() -> {
-                            searchForCandidateBroker(bundle, future, authoritative);
+                            searchForCandidateBroker(bundle, future, authoritative, advertisedListenerName);
                         });
                     }
                 } else if (nsData.get().isDisabled()) {
@@ -367,7 +387,20 @@ public class NamespaceService {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
                     }
-                    future.complete(Optional.of(new LookupResult(nsData.get())));
+                    // find the target
+                    if (StringUtils.isNotBlank(advertisedListenerName)) {
+                        AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(advertisedListenerName);
+                        if (listener == null) {
+                            future.completeExceptionally(
+                                    new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
+                        } else {
+                            future.complete(Optional.of(new LookupResult(nsData.get(),
+                                    listener.getBrokerServiceUrl().toString(), listener.getBrokerServiceUrlTls().toString())));
+                        }
+                        return;
+                    } else {
+                        future.complete(Optional.of(new LookupResult(nsData.get())));
+                    }
                 }
             }).exceptionally(exception -> {
                 LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
@@ -385,6 +418,12 @@ public class NamespaceService {
 
     private void searchForCandidateBroker(NamespaceBundle bundle,
             CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
+        searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
+    }
+
+    private void searchForCandidateBroker(NamespaceBundle bundle,
+                                          CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
+                                          final String advertisedListenerName) {
         String candidateBroker = null;
         try {
             // check if this is Heartbeat or SLAMonitor namespace
@@ -403,7 +442,7 @@ public class NamespaceService {
 
                         // If leader is not active, fallback to pick the least loaded from current broker loadmanager
                         || !isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
-                    ) {
+                ) {
                     Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
                     if (!availableBroker.isPresent()) {
                         lookupFuture.complete(Optional.empty());
@@ -447,8 +486,22 @@ public class NamespaceService {
 
                         // Schedule the task to pre-load topics
                         pulsar.loadNamespaceTopics(bundle);
-
-                        lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
+                        // find the target
+                        if (StringUtils.isNotBlank(advertisedListenerName)) {
+                            AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
+                            if (listener == null) {
+                                lookupFuture.completeExceptionally(
+                                        new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
+                                return;
+                            } else {
+                                lookupFuture.complete(Optional.of(new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
+                                        listener.getBrokerServiceUrlTls().toString())));
+                                return;
+                            }
+                        } else {
+                            lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
+                            return;
+                        }
                     }
                 }).exceptionally(exception -> {
                     LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", bundle, exception);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 50e96fa..7acd5f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -162,9 +162,9 @@ public class OwnershipCache {
         this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
         this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
         this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
         this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
+                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
         this.bundleFactory = bundleFactory;
         this.localZkCache = pulsar.getLocalZkCache();
         this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
@@ -379,7 +379,7 @@ public class OwnershipCache {
     public synchronized boolean refreshSelfOwnerInfo() {
         if (selfOwnerInfo.getNativeUrl() == null) {
             this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(),
-                    pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+                    pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
         }
         return selfOwnerInfo.getNativeUrl() != null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0416594..d2fa9c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -278,6 +278,7 @@ public class ServerCnx extends PulsarHandler {
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
         final boolean authoritative = lookup.getAuthoritative();
+        final String advertisedListenerName = lookup.getAdvertisedListenerName();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
         }
@@ -309,7 +310,7 @@ public class ServerCnx extends PulsarHandler {
                 if (isProxyAuthorized) {
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
                             finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
-                            requestId).handle((lookupResponse, ex) -> {
+                            requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
                                 } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 7bc4b40..c1e39b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -139,13 +139,16 @@ public abstract class MockedPulsarServiceBaseTest {
         pulsarClient = newPulsarClient(lookupUrl, 1);
     }
 
-    protected final void init() throws Exception {
+    protected void doInitConf() throws Exception {
         this.conf.setBrokerServicePort(Optional.of(0));
         this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setAdvertisedAddress("localhost");
         this.conf.setWebServicePort(Optional.of(0));
         this.conf.setWebServicePortTls(Optional.of(0));
+    }
 
+    protected final void init() throws Exception {
+        doInitConf();
         sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
         bkExecutor = Executors.newSingleThreadExecutor(
                 new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
new file mode 100644
index 0000000..dae9f7d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPulsarServiceBaseTest {
+
+    private ExecutorService executorService;
+    //
+    private LookupService lookupService;
+    //
+    private String host;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.executorService = Executors.newFixedThreadPool(1);
+        this.isTcpLookup = true;
+        super.internalSetup();
+    }
+
+    protected void doInitConf() throws Exception {
+        this.host = InetAddress.getLocalHost().getHostAddress();
+        super.doInitConf();
+        this.conf.setClusterName("localhost");
+        this.conf.setAdvertisedAddress(null);
+        this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651", host, host));
+        this.conf.setInternalListenerName("internal");
+    }
+
+    protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testFindBrokerWithListenerName() throws Throwable {
+        admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
+        this.admin.tenants().createTenant("public", tenantInfo);
+        this.admin.namespaces().createNamespace("public/default");
+        this.lookupService = new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient, lookupUrl.toString(),
+                "internal", false, this.executorService);
+        // test request 1
+        {
+            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+            Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+            Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+        }
+        // test request 2
+        {
+            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+            Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+            Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+        }
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (this.executorService != null) {
+            this.lookupService.close();
+        }
+        if (this.executorService != null) {
+            this.executorService.shutdown();
+        }
+        super.internalCleanup();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
new file mode 100644
index 0000000..c96ae27
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class PulsarMultiListenersWithoutInternalListenerNameTest extends MockedPulsarServiceBaseTest {
+
+    private ExecutorService executorService;
+    //
+    private LookupService lookupService;
+    //
+    private String host;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.executorService = Executors.newFixedThreadPool(1);
+        this.isTcpLookup = true;
+        super.internalSetup();
+    }
+
+    protected void doInitConf() throws Exception {
+        this.host = InetAddress.getLocalHost().getHostAddress();
+        super.doInitConf();
+        this.conf.setClusterName("localhost");
+        this.conf.setAdvertisedAddress(null);
+        this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651", host, host));
+    }
+
+    protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testFindBrokerWithListenerName() throws Throwable {
+        admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
+        this.admin.tenants().createTenant("public", tenantInfo);
+        this.admin.namespaces().createNamespace("public/default");
+        this.lookupService = new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient, lookupUrl.toString(),
+                "internal", false, this.executorService);
+        // test request 1
+        {
+            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+            Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+            Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+        }
+        // test request 2
+        {
+            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+            Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+            Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+        }
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (this.executorService != null) {
+            this.lookupService.close();
+        }
+        if (this.executorService != null) {
+            this.executorService.shutdown();
+        }
+        super.internalCleanup();
+    }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index c12e9a1..b3aee5b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -113,6 +113,14 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
 
     /**
+     * Configure the listenerName that the broker will return the corresponding `advertisedListener`.
+     *
+     * @param name the listener name
+     * @return the client builder instance
+     */
+    ClientBuilder listenerName(String name);
+
+    /**
      * Set the authentication provider to use in the Pulsar client instance.
      *
      * <p>Example:
diff --git a/pulsar-client-messagecrypto-bc/pom.xml b/pulsar-client-messagecrypto-bc/pom.xml
index 0f7b190..73e1f31 100644
--- a/pulsar-client-messagecrypto-bc/pom.xml
+++ b/pulsar-client-messagecrypto-bc/pom.xml
@@ -46,5 +46,6 @@
       <artifactId>bouncy-castle-bc-shaded</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+
   </dependencies>
 </project>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index d3a7df5..e64e161 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -54,15 +54,22 @@ public class BinaryProtoLookupService implements LookupService {
     private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
     private final ExecutorService executor;
+    private final String listenerName;
     private final int maxLookupRedirects;
 
     public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor)
             throws PulsarClientException {
+        this(client, serviceUrl, null, useTls, executor);
+    }
+
+    public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, ExecutorService executor)
+            throws PulsarClientException {
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
         this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
         this.serviceNameResolver = new PulsarServiceNameResolver();
+        this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
     }
 
@@ -102,7 +109,7 @@ public class BinaryProtoLookupService implements LookupService {
 
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
-            ByteBuf request = Commands.newLookup(topicName.toString(), authoritative, requestId);
+            ByteBuf request = Commands.newLookup(topicName.toString(), this.listenerName, authoritative, requestId);
             clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
                 URI uri = null;
                 try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 7a56e42..2c920e2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -101,6 +101,15 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder listenerName(String listenerName) {
+        if (StringUtils.isBlank(listenerName)) {
+            throw new IllegalArgumentException("Param listenerName must not be blank.");
+        }
+        conf.setListenerName(StringUtils.trim(listenerName));
+        return this;
+    }
+
+    @Override
     public ClientBuilder authentication(Authentication authentication) {
         conf.setAuthentication(authentication);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bcb0f51..42cd7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -142,7 +142,7 @@ public class PulsarClientImpl implements PulsarClient {
         if (conf.getServiceUrl().startsWith("http")) {
             lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
-            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
+            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
         }
         timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
         producers = Maps.newIdentityHashMap();
@@ -650,7 +650,8 @@ public class PulsarClientImpl implements PulsarClient {
         if (conf.getServiceUrl().startsWith("http")) {
             lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
-            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
+            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
+                    externalExecutorProvider.getExecutor());
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index ce4cfe3..ac08553 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -74,6 +74,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private int requestTimeoutMs = 60000;
     private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
     private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
+    //
+    private String listenerName;
 
     // set TLS using KeyStore way.
     private boolean useKeyStoreTls = false;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 9691cc9..a3d7ff0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13710,6 +13710,10 @@ public final class PulsarApi {
     // optional string original_auth_method = 6;
     boolean hasOriginalAuthMethod();
     String getOriginalAuthMethod();
+    
+    // optional string advertised_listener_name = 7;
+    boolean hasAdvertisedListenerName();
+    String getAdvertisedListenerName();
   }
   public static final class CommandLookupTopic extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -13894,6 +13898,38 @@ public final class PulsarApi {
       }
     }
     
+    // optional string advertised_listener_name = 7;
+    public static final int ADVERTISED_LISTENER_NAME_FIELD_NUMBER = 7;
+    private java.lang.Object advertisedListenerName_;
+    public boolean hasAdvertisedListenerName() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public String getAdvertisedListenerName() {
+      java.lang.Object ref = advertisedListenerName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          advertisedListenerName_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAdvertisedListenerNameBytes() {
+      java.lang.Object ref = advertisedListenerName_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        advertisedListenerName_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
@@ -13901,6 +13937,7 @@ public final class PulsarApi {
       originalPrincipal_ = "";
       originalAuthData_ = "";
       originalAuthMethod_ = "";
+      advertisedListenerName_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -13945,6 +13982,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBytes(6, getOriginalAuthMethodBytes());
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBytes(7, getAdvertisedListenerNameBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -13977,6 +14017,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBytesSize(6, getOriginalAuthMethodBytes());
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(7, getAdvertisedListenerNameBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -14102,6 +14146,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000010);
         originalAuthMethod_ = "";
         bitField0_ = (bitField0_ & ~0x00000020);
+        advertisedListenerName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -14159,6 +14205,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000020;
         }
         result.originalAuthMethod_ = originalAuthMethod_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.advertisedListenerName_ = advertisedListenerName_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -14183,6 +14233,9 @@ public final class PulsarApi {
         if (other.hasOriginalAuthMethod()) {
           setOriginalAuthMethod(other.getOriginalAuthMethod());
         }
+        if (other.hasAdvertisedListenerName()) {
+          setAdvertisedListenerName(other.getAdvertisedListenerName());
+        }
         return this;
       }
       
@@ -14250,6 +14303,11 @@ public final class PulsarApi {
               originalAuthMethod_ = input.readBytes();
               break;
             }
+            case 58: {
+              bitField0_ |= 0x00000040;
+              advertisedListenerName_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -14442,6 +14500,42 @@ public final class PulsarApi {
         
       }
       
+      // optional string advertised_listener_name = 7;
+      private java.lang.Object advertisedListenerName_ = "";
+      public boolean hasAdvertisedListenerName() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public String getAdvertisedListenerName() {
+        java.lang.Object ref = advertisedListenerName_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          advertisedListenerName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setAdvertisedListenerName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000040;
+        advertisedListenerName_ = value;
+        
+        return this;
+      }
+      public Builder clearAdvertisedListenerName() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        advertisedListenerName_ = getDefaultInstance().getAdvertisedListenerName();
+        
+        return this;
+      }
+      void setAdvertisedListenerName(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000040;
+        advertisedListenerName_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d5e772a..51d0a11 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -39,6 +39,7 @@ import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Range;
@@ -823,10 +824,17 @@ public class Commands {
     }
 
     public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
+        return newLookup(topic, null, authoritative, requestId);
+    }
+
+    public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId) {
         CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
         lookupTopicBuilder.setTopic(topic);
         lookupTopicBuilder.setRequestId(requestId);
         lookupTopicBuilder.setAuthoritative(authoritative);
+        if (StringUtils.isNotBlank(listenerName)) {
+            lookupTopicBuilder.setAdvertisedListenerName(listenerName);
+        }
         CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
         lookupTopicBuilder.recycle();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
new file mode 100644
index 0000000..522509d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
+
+import java.net.URI;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * The advertisedListener for broker with brokerServiceUrl and brokerServiceUrlTls.
+ */
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AdvertisedListener {
+    //
+    @Getter
+    @Setter
+    // the broker service uri without ssl
+    private URI brokerServiceUrl;
+    //
+    @Getter
+    @Setter
+    // the broker service uri with ssl
+    private URI brokerServiceUrlTls;
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 6b4ea64..46aaa68 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.policies.data.loadbalancer;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.google.common.collect.Maps;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -83,6 +84,8 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
 
     // the external protocol data advertised by protocol handlers.
     private Map<String, String> protocols;
+    //
+    private Map<String, AdvertisedListener> advertisedListeners;
 
     // For JSON only.
     public LocalBrokerData() {
@@ -94,6 +97,12 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
      */
     public LocalBrokerData(final String webServiceUrl, final String webServiceUrlTls, final String pulsarServiceUrl,
             final String pulsarServiceUrlTls) {
+        this(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
+                Collections.unmodifiableMap(Collections.emptyMap()));
+    }
+
+    public LocalBrokerData(final String webServiceUrl, final String webServiceUrlTls, final String pulsarServiceUrl,
+                           final String pulsarServiceUrlTls, Map<String, AdvertisedListener> advertisedListeners) {
         this.webServiceUrl = webServiceUrl;
         this.webServiceUrlTls = webServiceUrlTls;
         this.pulsarServiceUrl = pulsarServiceUrl;
@@ -109,6 +118,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
         lastBundleGains = new HashSet<>();
         lastBundleLosses = new HashSet<>();
         protocols = new HashMap<>();
+        this.advertisedListeners = Collections.unmodifiableMap(Maps.newHashMap(advertisedListeners));
     }
 
     /**
@@ -472,4 +482,11 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
         return Optional.ofNullable(protocols.get(protocol));
     }
 
+    public Map<String, AdvertisedListener> getAdvertisedListeners() {
+        return advertisedListeners;
+    }
+
+    public void setAdvertisedListeners(Map<String, AdvertisedListener> advertisedListeners) {
+        this.advertisedListeners = advertisedListeners;
+    }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index cd7b760..d43617b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -385,6 +385,8 @@ message CommandLookupTopic {
     // to the proxy.
     optional string original_auth_data = 5;
     optional string original_auth_method = 6;
+    //
+    optional string advertised_listener_name = 7;
 }
 
 message CommandLookupTopicResponse {