You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 11:28:56 UTC
[pulsar] branch master updated: PIP-61: Advertise multiple
addresses (#6903)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e40bd9 PIP-61: Advertise multiple addresses (#6903)
8e40bd9 is described below
commit 8e40bd977fc85eb2d4f54146f2876ee85b17fdfd
Author: luceneReader <54...@qq.com>
AuthorDate: Wed Jun 3 19:28:45 2020 +0800
PIP-61: Advertise multiple addresses (#6903)
* PIP-61:
1. resolve broker.conf, validate `advertisedListeners` and `internalListenerName`
2. register the `advertisedListeners` to zookeeper
3. client find the target broker with listenerName
4. add test case PulsarMultiListenersTest
5. add test case MultipleListenerValidatorTest
---
conf/broker.conf | 12 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 14 +++
.../validator/MultipleListenerValidator.java | 137 +++++++++++++++++++++
.../validator/MultipleListenerValidatorTest.java | 100 +++++++++++++++
.../org/apache/pulsar/broker/PulsarService.java | 20 ++-
.../loadbalance/impl/ModularLoadManagerImpl.java | 4 +-
.../impl/ModularLoadManagerWrapper.java | 1 -
.../apache/pulsar/broker/lookup/LookupResult.java | 13 +-
.../pulsar/broker/lookup/TopicLookupBase.java | 71 +++++++----
.../broker/namespace/NamespaceEphemeralData.java | 27 +++-
.../pulsar/broker/namespace/NamespaceService.java | 69 +++++++++--
.../pulsar/broker/namespace/OwnershipCache.java | 6 +-
.../apache/pulsar/broker/service/ServerCnx.java | 3 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 5 +-
...MultiListenersWithInternalListenerNameTest.java | 108 ++++++++++++++++
...tiListenersWithoutInternalListenerNameTest.java | 107 ++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 8 ++
pulsar-client-messagecrypto-bc/pom.xml | 1 +
.../client/impl/BinaryProtoLookupService.java | 9 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 9 ++
.../pulsar/client/impl/PulsarClientImpl.java | 5 +-
.../client/impl/conf/ClientConfigurationData.java | 2 +
.../apache/pulsar/common/api/proto/PulsarApi.java | 94 ++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 8 ++
.../data/loadbalancer/AdvertisedListener.java | 47 +++++++
.../data/loadbalancer/LocalBrokerData.java | 17 +++
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
27 files changed, 855 insertions(+), 44 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2a6ef62..8d50aeb 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -43,6 +43,18 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Used to specify multiple advertised listeners for the broker.
+# The value must format as <listener_name>:pulsar://<host>:<port>,
+# multiple listeners should separate with commas.
+# Do not use this configuration with advertisedAddress and brokerServicePort.
+# The Default value is absent means use advertisedAddress and brokerServicePort.
+# advertisedListeners=
+
+# Used to specify the internal listener name for the broker.
+# The listener name must contain in the advertisedListeners.
+# The Default value is absent, the broker uses the first listener as the internal listener.
+# internalListenerName=
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6238ff6..f90d9d9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -149,6 +149,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String advertisedAddress;
+ @FieldContext(category=CATEGORY_SERVER,
+ doc = "Used to specify multiple advertised listeners for the broker."
+ + " The value must format as <listener_name>:pulsar://<host>:<port>,"
+ + "multiple listeners should separate with commas."
+ + "Do not use this configuration with advertisedAddress and brokerServicePort."
+ + "The Default value is absent means use advertisedAddress and brokerServicePort.")
+ private String advertisedListeners;
+
+ @FieldContext(category=CATEGORY_SERVER,
+ doc = "Used to specify the internal listener name for the broker."
+ + "The listener name must contain in the advertisedListeners."
+ + "The Default value is absent, the broker uses the first listener as the internal listener.")
+ private String internalListenerName;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty IO."
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
new file mode 100644
index 0000000..9f44da8
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.validator;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * the validator for pulsar multiple listeners.
+ */
+public final class MultipleListenerValidator {
+
+ /**
+ * validate the configure of `advertisedListeners`, `internalListenerName`, `advertisedAddress`.
+ * 1. `advertisedListeners` and `advertisedAddress` must not appear together.
+ * 2. the listener name in `advertisedListeners` must not duplicate.
+ * 3. user can not assign same 'host:port' to different listener.
+ * 4. if `internalListenerName` is absent, the first `listener` in the `advertisedListeners` will be the `internalListenerName`.
+ * 5. if pulsar do not specify `brokerServicePortTls`, should only contain one entry of `pulsar://` per listener name.
+ * @param config the pulsar broker configure.
+ * @return
+ */
+ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListener(ServiceConfiguration config) {
+ if (StringUtils.isNotBlank(config.getAdvertisedListeners()) && StringUtils.isNotBlank(config.getAdvertisedAddress())) {
+ throw new IllegalArgumentException("`advertisedListeners` and `advertisedAddress` must not appear together");
+ }
+ if (StringUtils.isBlank(config.getAdvertisedListeners())) {
+ return Collections.EMPTY_MAP;
+ }
+ Optional<String> firstListenerName = Optional.empty();
+ Map<String, List<String>> listeners = Maps.newHashMap();
+ for (final String str : StringUtils.split(config.getAdvertisedListeners(), ",")) {
+ int index = str.indexOf(":");
+ if (index <= 0) {
+ throw new IllegalArgumentException("the configure entry `advertisedListeners` is invalid. because " +
+ str + " do not contain listener name");
+ }
+ String listenerName = StringUtils.trim(str.substring(0, index));
+ if (!firstListenerName.isPresent()) {
+ firstListenerName = Optional.of(listenerName);
+ }
+ String value = StringUtils.trim(str.substring(index + 1));
+ listeners.computeIfAbsent(listenerName, k -> Lists.newArrayListWithCapacity(2));
+ listeners.get(listenerName).add(value);
+ }
+ if (StringUtils.isBlank(config.getInternalListenerName())) {
+ config.setInternalListenerName(firstListenerName.get());
+ }
+ if (!listeners.containsKey(config.getInternalListenerName())) {
+ throw new IllegalArgumentException("the `advertisedListeners` configure do not contain `internalListenerName` entry");
+ }
+ final Map<String, AdvertisedListener> result = Maps.newHashMap();
+ final Map<String, Set<String>> reverseMappings = Maps.newHashMap();
+ for (final Map.Entry<String, List<String>> entry : listeners.entrySet()) {
+ if (entry.getValue().size() > 2) {
+ throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+ }
+ URI pulsarAddress = null, pulsarSslAddress = null;
+ for (final String strUri : entry.getValue()) {
+ try {
+ URI uri = URI.create(strUri);
+ if (StringUtils.equalsIgnoreCase(uri.getScheme(), "pulsar")) {
+ if (pulsarAddress == null) {
+ pulsarAddress = uri;
+ } else {
+ throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+ }
+ } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "pulsar+ssl")) {
+ if (pulsarSslAddress == null) {
+ pulsarSslAddress = uri;
+ } else {
+ throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
+ }
+ }
+ String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
+ reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
+ Set<String> sets = reverseMappings.get(hostPort);
+ if (sets == null) {
+ sets = Sets.newTreeSet();
+ reverseMappings.put(hostPort, sets);
+ }
+ sets.add(entry.getKey());
+ if (sets.size() > 1) {
+ throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
+ }
+ } catch (Throwable cause) {
+ throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
+ }
+ }
+ if (!config.getBrokerServicePortTls().isPresent()) {
+ if (pulsarSslAddress != null) {
+ throw new IllegalArgumentException("If pulsar do not start ssl port, there is no need to configure " +
+ " `pulsar+ssl` in `" + entry.getKey() + "` listener.");
+ }
+ } else {
+ if (pulsarSslAddress == null) {
+ throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+ + " do not specify `pulsar+ssl` address.");
+ }
+ }
+ if (pulsarAddress == null) {
+ throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+ + " do not specify `pulsar` address.");
+ }
+ result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
+ }
+ return result;
+ }
+
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
new file mode 100644
index 0000000..89b7217
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.validator;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+/**
+ * testcase for MultipleListenerValidator.
+ */
+public class MultipleListenerValidatorTest {
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testAppearTogether() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setAdvertisedAddress("127.0.0.1");
+ config.setAdvertisedListeners("internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testListenerDuplicate_1() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651,"
+ + " internal:pulsar://192.168.1.11:6660, internal:pulsar+ssl://192.168.1.11:6651");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testListenerDuplicate_2() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " internal:pulsar://192.168.1.11:6660");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testDifferentListenerWithSameHostPort() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " external:pulsar://127.0.0.1:6660");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testListenerWithoutTLSPort() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test
+ public void testListenerWithTLSPort() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerServicePortTls(Optional.of(6651));
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testListenerWithoutNonTLSAddress() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerServicePortTls(Optional.of(6651));
+ config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6651");
+ config.setInternalListenerName("internal");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testWithoutListenerNameInAdvertisedListeners() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerServicePortTls(Optional.of(6651));
+ config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
+ config.setInternalListenerName("external");
+ MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 35b6a18..689b0a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -92,6 +93,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -117,6 +119,7 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -205,6 +208,8 @@ public class PulsarService implements AutoCloseable {
private final ReentrantLock mutex = new ReentrantLock();
private final Condition isClosedCondition = mutex.newCondition();
+ // key is listener name , value is pulsar address and pulsar ssl address
+ private Map<String, AdvertisedListener> advertisedListeners;
public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
@@ -217,10 +222,21 @@ public class PulsarService implements AutoCloseable {
Consumer<Integer> processTerminator) {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
-
+ // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
+ Map<String, AdvertisedListener> result = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+ if (result != null) {
+ this.advertisedListeners = Collections.unmodifiableMap(result);
+ } else {
+ this.advertisedListeners = Collections.unmodifiableMap(Collections.emptyMap());
+ }
state = State.Init;
+ // use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
- this.advertisedAddress = advertisedAddress(config);
+ if (!this.advertisedListeners.isEmpty()) {
+ this.advertisedAddress = this.advertisedListeners.get(config.getInternalListenerName()).getBrokerServiceUrl().getHost();
+ } else {
+ this.advertisedAddress = advertisedAddress(config);
+ }
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c59e61c..ba5f3ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -852,14 +852,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 701a217..3a4a7a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index 965db1c..c84cb11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.lookup.data.LookupData;
* to redirect the client to try again.
*/
public class LookupResult {
- enum Type {
+ public enum Type {
BrokerUrl, RedirectUrl
}
@@ -49,6 +49,17 @@ public class LookupResult {
this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
}
+ public LookupResult(String httpUrl, String httpUrlTls, String nativeUrl, String nativeUrlTls, Type type) {
+ this.type = type;
+ this.lookupData = new LookupData(nativeUrl, nativeUrlTls, httpUrl, httpUrlTls);
+ }
+
+ public LookupResult(NamespaceEphemeralData namespaceEphemeralData, String nativeUrl, String nativeUrlTls) {
+ this.type = Type.BrokerUrl;
+ this.lookupData = new LookupData(nativeUrl, nativeUrlTls, namespaceEphemeralData.getHttpUrl(),
+ namespaceEphemeralData.getHttpUrlTls());
+ }
+
public boolean isBrokerUrl() {
return type == Type.BrokerUrl;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bac10d4..9738e20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -141,11 +141,13 @@ public class TopicLookupBase extends PulsarWebResource {
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
- * a. Returns broker-address if namespace-bundle is already owned by any broker b. If current-broker receives
- * lookup-request and if it's not a leader then current broker redirects request to leader by returning
- * leader-service address. c. If current-broker is leader then it finds out least-loaded broker to own namespace
- * bundle and redirects request by returning least-loaded broker. d. If current-broker receives request to own the
- * namespace-bundle then it owns a bundle and returns success(connect) response to client.
+ * a. Returns broker-address if namespace-bundle is already owned by any broker
+ * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
+ * to leader by returning leader-service address.
+ * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request
+ * by returning least-loaded broker.
+ * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect)
+ * response to client.
*
* @param pulsarService
* @param topicName
@@ -156,6 +158,33 @@ public class TopicLookupBase extends PulsarWebResource {
*/
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
+ return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null);
+ }
+
+ /**
+ *
+ * Lookup broker-service address for a given namespace-bundle which contains given topic.
+ *
+ * a. Returns broker-address if namespace-bundle is already owned by any broker
+ * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
+ * to leader by returning leader-service address.
+ * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request
+ * by returning least-loaded broker.
+ * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect)
+ * response to client.
+ *
+ * @param pulsarService
+ * @param topicName
+ * @param authoritative
+ * @param clientAppId
+ * @param requestId
+ * @param advertisedListenerName
+ * @return
+ */
+ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
+ boolean authoritative, String clientAppId,
+ AuthenticationDataSource authenticationData, long requestId,
+ final String advertisedListenerName) {
final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -207,10 +236,10 @@ public class TopicLookupBase extends PulsarWebResource {
false));
}).exceptionally(ex -> {
- validationFuture.complete(
- newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
- return null;
- });
+ validationFuture.complete(
+ newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
+ return null;
+ });
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
@@ -222,7 +251,7 @@ public class TopicLookupBase extends PulsarWebResource {
if (validationFailureResponse != null) {
lookupfuture.complete(validationFailureResponse);
} else {
- pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative)
+ pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative, advertisedListenerName)
.thenAccept(lookupResult -> {
if (log.isDebugEnabled()) {
@@ -252,17 +281,17 @@ public class TopicLookupBase extends PulsarWebResource {
requestId, redirectThroughServiceUrl));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
- topicName.toString(), ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
- newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
- return null;
- });
+ if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
+ log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
+ topicName.toString(), ex.getCause().getMessage());
+ } else {
+ log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
+ topicName.toString(), ex.getMessage(), ex);
+ }
+ lookupfuture.complete(
+ newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
+ return null;
+ });
}
}).exceptionally(ex -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
index 6716d6f..30e3567 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
@@ -19,6 +19,12 @@
package org.apache.pulsar.broker.namespace;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.Maps;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+
+import javax.validation.constraints.NotNull;
+import java.util.Collections;
+import java.util.Map;
public class NamespaceEphemeralData {
private String nativeUrl;
@@ -26,17 +32,28 @@ public class NamespaceEphemeralData {
private String httpUrl;
private String httpUrlTls;
private boolean disabled;
+ private Map<String, AdvertisedListener> advertisedListeners;
public NamespaceEphemeralData() {
}
public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
boolean disabled) {
+ this(brokerUrl, brokerUrlTls, httpUrl, httpUrlTls, disabled, null);
+ }
+
+ public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
+ boolean disabled, Map<String, AdvertisedListener> advertisedListeners) {
this.nativeUrl = brokerUrl;
this.nativeUrlTls = brokerUrlTls;
this.httpUrl = httpUrl;
this.httpUrlTls = httpUrlTls;
this.disabled = disabled;
+ if (advertisedListeners == null) {
+ this.advertisedListeners = Collections.EMPTY_MAP;
+ } else {
+ this.advertisedListeners = Maps.newHashMap(advertisedListeners);
+ }
}
public String getNativeUrl() {
@@ -63,9 +80,17 @@ public class NamespaceEphemeralData {
this.disabled = flag;
}
+ @NotNull
+ public Map<String, AdvertisedListener> getAdvertisedListeners() {
+ if (this.advertisedListeners == null) {
+ return Collections.EMPTY_MAP;
+ }
+ return Collections.unmodifiableMap(this.advertisedListeners);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("nativeUrl", nativeUrl).add("httpUrl", httpUrl)
- .add("disabled", disabled).toString();
+ .add("disabled", disabled).add("advertisedListeners", getAdvertisedListeners()).toString();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 74d69c4..943761e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
@@ -113,7 +114,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
*/
public class NamespaceService {
- public enum AddressType {
+ public static enum AddressType {
BROKER_URL, LOOKUP_URL
}
@@ -172,8 +173,13 @@ public class NamespaceService {
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
boolean authoritative) {
+ return getBrokerServiceUrlAsync(topic, authoritative, null);
+ }
+
+ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
+ final String advertisedListenerName) {
return getBundleAsync(topic)
- .thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */));
+ .thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */, advertisedListenerName));
}
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
@@ -322,16 +328,30 @@ public class NamespaceService {
= new ConcurrentOpenHashMap<>();
/**
+ * Main internal method to lookup and setup ownership of service unit to a broker.
+ *
+ * @param bundle
+ * @param authoritative
+ * @param readOnly
+ * @return
+ */
+ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
+ boolean readOnly) {
+ return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
+ }
+
+ /**
* Main internal method to lookup and setup ownership of service unit to a broker
*
* @param bundle
* @param authoritative
* @param readOnly
+ * @param advertisedListenerName
* @return
* @throws PulsarServerException
*/
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
- boolean readOnly) {
+ boolean readOnly, final String advertisedListenerName) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
}
@@ -357,7 +377,7 @@ public class NamespaceService {
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> {
- searchForCandidateBroker(bundle, future, authoritative);
+ searchForCandidateBroker(bundle, future, authoritative, advertisedListenerName);
});
}
} else if (nsData.get().isDisabled()) {
@@ -367,7 +387,20 @@ public class NamespaceService {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
- future.complete(Optional.of(new LookupResult(nsData.get())));
+ // find the target
+ if (StringUtils.isNotBlank(advertisedListenerName)) {
+ AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(advertisedListenerName);
+ if (listener == null) {
+ future.completeExceptionally(
+ new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
+ } else {
+ future.complete(Optional.of(new LookupResult(nsData.get(),
+ listener.getBrokerServiceUrl().toString(), listener.getBrokerServiceUrlTls().toString())));
+ }
+ return;
+ } else {
+ future.complete(Optional.of(new LookupResult(nsData.get())));
+ }
}
}).exceptionally(exception -> {
LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
@@ -385,6 +418,12 @@ public class NamespaceService {
private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
+ searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
+ }
+
+ private void searchForCandidateBroker(NamespaceBundle bundle,
+ CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
+ final String advertisedListenerName) {
String candidateBroker = null;
try {
// check if this is Heartbeat or SLAMonitor namespace
@@ -403,7 +442,7 @@ public class NamespaceService {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
- ) {
+ ) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
@@ -447,8 +486,22 @@ public class NamespaceService {
// Schedule the task to pre-load topics
pulsar.loadNamespaceTopics(bundle);
-
- lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
+ // find the target
+ if (StringUtils.isNotBlank(advertisedListenerName)) {
+ AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
+ if (listener == null) {
+ lookupFuture.completeExceptionally(
+ new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
+ return;
+ } else {
+ lookupFuture.complete(Optional.of(new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
+ listener.getBrokerServiceUrlTls().toString())));
+ return;
+ }
+ } else {
+ lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
+ return;
+ }
}
}).exceptionally(exception -> {
LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", bundle, exception);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 50e96fa..7acd5f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -162,9 +162,9 @@ public class OwnershipCache {
this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+ pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
+ pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
this.bundleFactory = bundleFactory;
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
@@ -379,7 +379,7 @@ public class OwnershipCache {
public synchronized boolean refreshSelfOwnerInfo() {
if (selfOwnerInfo.getNativeUrl() == null) {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(),
- pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+ pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
}
return selfOwnerInfo.getNativeUrl() != null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0416594..d2fa9c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -278,6 +278,7 @@ public class ServerCnx extends PulsarHandler {
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.getAuthoritative();
+ final String advertisedListenerName = lookup.getAdvertisedListenerName();
if (log.isDebugEnabled()) {
log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
}
@@ -309,7 +310,7 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
- requestId).handle((lookupResponse, ex) -> {
+ requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 7bc4b40..c1e39b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -139,13 +139,16 @@ public abstract class MockedPulsarServiceBaseTest {
pulsarClient = newPulsarClient(lookupUrl, 1);
}
- protected final void init() throws Exception {
+ protected void doInitConf() throws Exception {
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
+ }
+ protected final void init() throws Exception {
+ doInitConf();
sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
bkExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
new file mode 100644
index 0000000..dae9f7d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPulsarServiceBaseTest {
+
+ private ExecutorService executorService;
+ //
+ private LookupService lookupService;
+ //
+ private String host;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.executorService = Executors.newFixedThreadPool(1);
+ this.isTcpLookup = true;
+ super.internalSetup();
+ }
+
+ protected void doInitConf() throws Exception {
+ this.host = InetAddress.getLocalHost().getHostAddress();
+ super.doInitConf();
+ this.conf.setClusterName("localhost");
+ this.conf.setAdvertisedAddress(null);
+ this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651", host, host));
+ this.conf.setInternalListenerName("internal");
+ }
+
+ protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+ }
+
+ @Test
+ public void testFindBrokerWithListenerName() throws Throwable {
+ admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
+ this.admin.tenants().createTenant("public", tenantInfo);
+ this.admin.namespaces().createNamespace("public/default");
+ this.lookupService = new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient, lookupUrl.toString(),
+ "internal", false, this.executorService);
+ // test request 1
+ {
+ CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+ Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+ Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+ }
+ // test request 2
+ {
+ CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+ Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+ Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+ }
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ if (this.executorService != null) {
+ this.lookupService.close();
+ }
+ if (this.executorService != null) {
+ this.executorService.shutdown();
+ }
+ super.internalCleanup();
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
new file mode 100644
index 0000000..c96ae27
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class PulsarMultiListenersWithoutInternalListenerNameTest extends MockedPulsarServiceBaseTest {
+
+ private ExecutorService executorService;
+ //
+ private LookupService lookupService;
+ //
+ private String host;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.executorService = Executors.newFixedThreadPool(1);
+ this.isTcpLookup = true;
+ super.internalSetup();
+ }
+
+ protected void doInitConf() throws Exception {
+ this.host = InetAddress.getLocalHost().getHostAddress();
+ super.doInitConf();
+ this.conf.setClusterName("localhost");
+ this.conf.setAdvertisedAddress(null);
+ this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651", host, host));
+ }
+
+ protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+ }
+
+ @Test
+ public void testFindBrokerWithListenerName() throws Throwable {
+ admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
+ this.admin.tenants().createTenant("public", tenantInfo);
+ this.admin.namespaces().createNamespace("public/default");
+ this.lookupService = new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient, lookupUrl.toString(),
+ "internal", false, this.executorService);
+ // test request 1
+ {
+ CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+ Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+ Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+ }
+ // test request 2
+ {
+ CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future = lookupService.getBroker(TopicName.get("persistent://public/default/test"));
+ Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(result.getKey().toString(), String.format("%s:6650", this.host));
+ Assert.assertEquals(result.getValue().toString(), String.format("%s:6650", this.host));
+ }
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ if (this.executorService != null) {
+ this.lookupService.close();
+ }
+ if (this.executorService != null) {
+ this.executorService.shutdown();
+ }
+ super.internalCleanup();
+ }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index c12e9a1..b3aee5b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -113,6 +113,14 @@ public interface ClientBuilder extends Cloneable {
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
/**
+ * Configure the listenerName that the broker will return the corresponding `advertisedListener`.
+ *
+ * @param name the listener name
+ * @return the client builder instance
+ */
+ ClientBuilder listenerName(String name);
+
+ /**
* Set the authentication provider to use in the Pulsar client instance.
*
* <p>Example:
diff --git a/pulsar-client-messagecrypto-bc/pom.xml b/pulsar-client-messagecrypto-bc/pom.xml
index 0f7b190..73e1f31 100644
--- a/pulsar-client-messagecrypto-bc/pom.xml
+++ b/pulsar-client-messagecrypto-bc/pom.xml
@@ -46,5 +46,6 @@
<artifactId>bouncy-castle-bc-shaded</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
</dependencies>
</project>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index d3a7df5..e64e161 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -54,15 +54,22 @@ public class BinaryProtoLookupService implements LookupService {
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
+ private final String listenerName;
private final int maxLookupRedirects;
public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor)
throws PulsarClientException {
+ this(client, serviceUrl, null, useTls, executor);
+ }
+
+ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, ExecutorService executor)
+ throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
+ this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
}
@@ -102,7 +109,7 @@ public class BinaryProtoLookupService implements LookupService {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
- ByteBuf request = Commands.newLookup(topicName.toString(), authoritative, requestId);
+ ByteBuf request = Commands.newLookup(topicName.toString(), this.listenerName, authoritative, requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
URI uri = null;
try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 7a56e42..2c920e2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -101,6 +101,15 @@ public class ClientBuilderImpl implements ClientBuilder {
}
@Override
+ public ClientBuilder listenerName(String listenerName) {
+ if (StringUtils.isBlank(listenerName)) {
+ throw new IllegalArgumentException("Param listenerName must not be blank.");
+ }
+ conf.setListenerName(StringUtils.trim(listenerName));
+ return this;
+ }
+
+ @Override
public ClientBuilder authentication(Authentication authentication) {
conf.setAuthentication(authentication);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bcb0f51..42cd7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -142,7 +142,7 @@ public class PulsarClientImpl implements PulsarClient {
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
- lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
+ lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
}
timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
producers = Maps.newIdentityHashMap();
@@ -650,7 +650,8 @@ public class PulsarClientImpl implements PulsarClient {
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
- lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
+ lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
+ externalExecutorProvider.getExecutor());
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index ce4cfe3..ac08553 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -74,6 +74,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int requestTimeoutMs = 60000;
private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
+ //
+ private String listenerName;
// set TLS using KeyStore way.
private boolean useKeyStoreTls = false;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 9691cc9..a3d7ff0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13710,6 +13710,10 @@ public final class PulsarApi {
// optional string original_auth_method = 6;
boolean hasOriginalAuthMethod();
String getOriginalAuthMethod();
+
+ // optional string advertised_listener_name = 7;
+ boolean hasAdvertisedListenerName();
+ String getAdvertisedListenerName();
}
public static final class CommandLookupTopic extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -13894,6 +13898,38 @@ public final class PulsarApi {
}
}
+ // optional string advertised_listener_name = 7;
+ public static final int ADVERTISED_LISTENER_NAME_FIELD_NUMBER = 7;
+ private java.lang.Object advertisedListenerName_;
+ public boolean hasAdvertisedListenerName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public String getAdvertisedListenerName() {
+ java.lang.Object ref = advertisedListenerName_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs =
+ (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+ advertisedListenerName_ = s;
+ }
+ return s;
+ }
+ }
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAdvertisedListenerNameBytes() {
+ java.lang.Object ref = advertisedListenerName_;
+ if (ref instanceof String) {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b =
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+ advertisedListenerName_ = b;
+ return b;
+ } else {
+ return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ }
+ }
+
private void initFields() {
topic_ = "";
requestId_ = 0L;
@@ -13901,6 +13937,7 @@ public final class PulsarApi {
originalPrincipal_ = "";
originalAuthData_ = "";
originalAuthMethod_ = "";
+ advertisedListenerName_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -13945,6 +13982,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(6, getOriginalAuthMethodBytes());
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeBytes(7, getAdvertisedListenerNameBytes());
+ }
}
private int memoizedSerializedSize = -1;
@@ -13977,6 +14017,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBytesSize(6, getOriginalAuthMethodBytes());
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(7, getAdvertisedListenerNameBytes());
+ }
memoizedSerializedSize = size;
return size;
}
@@ -14102,6 +14146,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000010);
originalAuthMethod_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
+ advertisedListenerName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -14159,6 +14205,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000020;
}
result.originalAuthMethod_ = originalAuthMethod_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.advertisedListenerName_ = advertisedListenerName_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -14183,6 +14233,9 @@ public final class PulsarApi {
if (other.hasOriginalAuthMethod()) {
setOriginalAuthMethod(other.getOriginalAuthMethod());
}
+ if (other.hasAdvertisedListenerName()) {
+ setAdvertisedListenerName(other.getAdvertisedListenerName());
+ }
return this;
}
@@ -14250,6 +14303,11 @@ public final class PulsarApi {
originalAuthMethod_ = input.readBytes();
break;
}
+ case 58: {
+ bitField0_ |= 0x00000040;
+ advertisedListenerName_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -14442,6 +14500,42 @@ public final class PulsarApi {
}
+ // optional string advertised_listener_name = 7;
+ private java.lang.Object advertisedListenerName_ = "";
+ public boolean hasAdvertisedListenerName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public String getAdvertisedListenerName() {
+ java.lang.Object ref = advertisedListenerName_;
+ if (!(ref instanceof String)) {
+ String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+ advertisedListenerName_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setAdvertisedListenerName(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ advertisedListenerName_ = value;
+
+ return this;
+ }
+ public Builder clearAdvertisedListenerName() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ advertisedListenerName_ = getDefaultInstance().getAdvertisedListenerName();
+
+ return this;
+ }
+ void setAdvertisedListenerName(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+ bitField0_ |= 0x00000040;
+ advertisedListenerName_ = value;
+
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d5e772a..51d0a11 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -39,6 +39,7 @@ import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Range;
@@ -823,10 +824,17 @@ public class Commands {
}
public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
+ return newLookup(topic, null, authoritative, requestId);
+ }
+
+ public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId) {
CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
lookupTopicBuilder.setTopic(topic);
lookupTopicBuilder.setRequestId(requestId);
lookupTopicBuilder.setAuthoritative(authoritative);
+ if (StringUtils.isNotBlank(listenerName)) {
+ lookupTopicBuilder.setAdvertisedListenerName(listenerName);
+ }
CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
lookupTopicBuilder.recycle();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
new file mode 100644
index 0000000..522509d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
+
+import java.net.URI;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * The advertisedListener for broker with brokerServiceUrl and brokerServiceUrlTls.
+ */
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AdvertisedListener {
+ //
+ @Getter
+ @Setter
+ // the broker service uri without ssl
+ private URI brokerServiceUrl;
+ //
+ @Getter
+ @Setter
+ // the broker service uri with ssl
+ private URI brokerServiceUrlTls;
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 6b4ea64..46aaa68 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.policies.data.loadbalancer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.Maps;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -83,6 +84,8 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
// the external protocol data advertised by protocol handlers.
private Map<String, String> protocols;
+ //
+ private Map<String, AdvertisedListener> advertisedListeners;
// For JSON only.
public LocalBrokerData() {
@@ -94,6 +97,12 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
*/
public LocalBrokerData(final String webServiceUrl, final String webServiceUrlTls, final String pulsarServiceUrl,
final String pulsarServiceUrlTls) {
+ this(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
+ Collections.unmodifiableMap(Collections.emptyMap()));
+ }
+
+ public LocalBrokerData(final String webServiceUrl, final String webServiceUrlTls, final String pulsarServiceUrl,
+ final String pulsarServiceUrlTls, Map<String, AdvertisedListener> advertisedListeners) {
this.webServiceUrl = webServiceUrl;
this.webServiceUrlTls = webServiceUrlTls;
this.pulsarServiceUrl = pulsarServiceUrl;
@@ -109,6 +118,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
lastBundleGains = new HashSet<>();
lastBundleLosses = new HashSet<>();
protocols = new HashMap<>();
+ this.advertisedListeners = Collections.unmodifiableMap(Maps.newHashMap(advertisedListeners));
}
/**
@@ -472,4 +482,11 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
return Optional.ofNullable(protocols.get(protocol));
}
+ public Map<String, AdvertisedListener> getAdvertisedListeners() {
+ return advertisedListeners;
+ }
+
+ public void setAdvertisedListeners(Map<String, AdvertisedListener> advertisedListeners) {
+ this.advertisedListeners = advertisedListeners;
+ }
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index cd7b760..d43617b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -385,6 +385,8 @@ message CommandLookupTopic {
// to the proxy.
optional string original_auth_data = 5;
optional string original_auth_method = 6;
+ //
+ optional string advertised_listener_name = 7;
}
message CommandLookupTopicResponse {