You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:56:03 UTC
[pulsar] 06/13: Fix incorrect port of advertisedListener (#10961)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d7bff1e5aa3e4086fcb2d5c9b65507d597b00246
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jun 22 10:40:47 2021 +0800
Fix incorrect port of advertisedListener (#10961)
Fixes #10951
### Motivation
The advertisedListener has its own port, and now we have no way to obtain the port of TLS and non-TLS advertisedListener except by setting the listenerName through the client.
Therefore, brokerServiceUrl and webServiceUrl do not return the address and port of the advertisedListener
(cherry picked from commit 99c84c4cf029c4fa3cd3e6dfa47ed0cb5272bedc)
---
.../pulsar/broker/ServiceConfigurationUtils.java | 8 ++++++--
.../validator/MultipleListenerValidatorTest.java | 15 ++++++++++++---
.../java/org/apache/pulsar/broker/PulsarService.java | 12 ++++++------
.../loadbalance/impl/ModularLoadManagerImpl.java | 2 +-
.../org/apache/pulsar/compaction/CompactorTool.java | 8 ++++----
.../org/apache/pulsar/broker/PulsarServiceTest.java | 19 ++++++++++++-------
6 files changed, 41 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index fe23e6b..6ce69c5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -54,9 +54,13 @@ public class ServiceConfigurationUtils {
* Get the address of Broker, first try to get it from AdvertisedAddress.
* If it is not set, try to get the address set by advertisedListener.
* If it is still not set, get it through InetAddress.getLocalHost().
+ * @param configuration
+ * @param ignoreAdvertisedListener Sometimes we can’t use the default key of AdvertisedListener,
+ * setting it to true can ignore AdvertisedListener.
* @return
*/
- public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration) {
+ public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration,
+ boolean ignoreAdvertisedListener) {
Map<String, AdvertisedListener> result = MultipleListenerValidator
.validateAndAnalysisAdvertisedListener(configuration);
@@ -66,7 +70,7 @@ public class ServiceConfigurationUtils {
}
AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName());
- if (advertisedListener != null) {
+ if (advertisedListener != null && !ignoreAdvertisedListener) {
String address = advertisedListener.getBrokerServiceUrl().getHost();
if (address != null) {
return address;
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
index 8928e82..f41ed92 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.validator;
+import java.net.InetAddress;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.testng.annotations.Test;
@@ -46,15 +47,23 @@ public class MultipleListenerValidatorTest {
config.setBrokerServicePortTls(Optional.of(6651));
config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651");
config.setInternalListenerName("internal");
- assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.1");
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+ "192.0.0.1");
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+ InetAddress.getLocalHost().getCanonicalHostName());
config = new ServiceConfiguration();
config.setBrokerServicePortTls(Optional.of(6651));
config.setAdvertisedAddress("192.0.0.2");
- assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.2");
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+ "192.0.0.2");
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+ "192.0.0.2");
config.setAdvertisedAddress(null);
- assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+ ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
+ assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index be85f54..cbd3eb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -280,7 +280,7 @@ public class PulsarService implements AutoCloseable {
PulsarConfigurationLoader.isComplete(config);
// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
- this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config);
+ this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false);
state = State.Init;
// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
@@ -1361,7 +1361,7 @@ public class PulsarService implements AutoCloseable {
protected String brokerUrl(ServiceConfiguration config) {
if (config.getBrokerServicePort().isPresent()) {
- return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+ return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getBrokerListenPort().get());
} else {
return null;
@@ -1374,7 +1374,7 @@ public class PulsarService implements AutoCloseable {
public String brokerUrlTls(ServiceConfiguration config) {
if (config.getBrokerServicePortTls().isPresent()) {
- return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+ return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getBrokerListenPortTls().get());
} else {
return null;
@@ -1387,7 +1387,7 @@ public class PulsarService implements AutoCloseable {
public String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
- return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+ return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getListenPortHTTP().get());
} else {
return null;
@@ -1400,7 +1400,7 @@ public class PulsarService implements AutoCloseable {
public String webAddressTls(ServiceConfiguration config) {
if (config.getWebServicePortTls().isPresent()) {
- return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+ return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getListenPortHTTPS().get());
} else {
return null;
@@ -1546,7 +1546,7 @@ public class PulsarService implements AutoCloseable {
// worker talks to local broker
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
- ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig));
+ brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
// inherit broker authorization setting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 6fd2884..5b7867e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -931,7 +931,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();
- dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf));
+ dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true));
dimensions.put("metric", "loadBalancing");
Metrics m = Metrics.create(dimensions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index fe7a5b8..ee2b374 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -104,15 +104,15 @@ public class CompactorTool {
log.info("Found `brokerServicePortTls` in configuration file. \n"
+ "Will connect pulsar use TLS.");
clientBuilder
- .serviceUrl(PulsarService.brokerUrlTls(ServiceConfigurationUtils
- .getAppliedAdvertisedAddress(brokerConfig),
+ .serviceUrl(PulsarService.brokerUrlTls(
+ ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
brokerConfig.getBrokerServicePortTls().get()))
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
} else {
- clientBuilder.serviceUrl(PulsarService.brokerUrl(ServiceConfigurationUtils
- .getAppliedAdvertisedAddress(brokerConfig),
+ clientBuilder.serviceUrl(PulsarService.brokerUrl(
+ ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
brokerConfig.getBrokerServicePort().get()));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 9da4aa7..1f80221 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -106,17 +106,22 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
@Test
public void testAppliedAdvertised() throws Exception {
useListenerName = true;
- conf.setAdvertisedListeners("internal:pulsar://127.0.0.1, internal:pulsar+ssl://127.0.0.1");
+ conf.setAdvertisedListeners("internal:pulsar://127.0.0.1:6650, internal:pulsar+ssl://127.0.0.1:6651");
conf.setInternalListenerName("internal");
setup();
-
- AssertJUnit.assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
+ assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
assertNull(pulsar.getConfiguration().getAdvertisedAddress());
assertEquals(conf, pulsar.getConfiguration());
- assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://127.0.0.1:6651");
- assertEquals(pulsar.brokerUrl(conf), "pulsar://127.0.0.1:6660");
- assertEquals(pulsar.webAddress(conf), "http://127.0.0.1:8081");
- assertEquals(pulsar.webAddressTls(conf), "https://127.0.0.1:8082");
+
+ cleanup();
+ resetConfig();
+ setup();
+ assertEquals(pulsar.getAdvertisedAddress(), "localhost");
+ assertEquals(conf, pulsar.getConfiguration());
+ assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://localhost:" + pulsar.getBrokerListenPortTls().get());
+ assertEquals(pulsar.brokerUrl(conf), "pulsar://localhost:" + pulsar.getBrokerListenPort().get());
+ assertEquals(pulsar.webAddress(conf), "http://localhost:" + pulsar.getWebService().getListenPortHTTP().get());
+ assertEquals(pulsar.webAddressTls(conf), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}
}