You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:56:03 UTC

[pulsar] 06/13: Fix incorrect port of advertisedListener (#10961)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d7bff1e5aa3e4086fcb2d5c9b65507d597b00246
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jun 22 10:40:47 2021 +0800

    Fix incorrect port of advertisedListener (#10961)
    
    Fixes #10951
    
    ### Motivation
    The advertisedListener has its own port, and now we have no way to obtain the port of TLS and non-TLS advertisedListener except by setting the listenerName through the client.
    Therefore, brokerServiceUrl and webServiceUrl do not return the address and port of the advertisedListener
    
    
    (cherry picked from commit 99c84c4cf029c4fa3cd3e6dfa47ed0cb5272bedc)
---
 .../pulsar/broker/ServiceConfigurationUtils.java      |  8 ++++++--
 .../validator/MultipleListenerValidatorTest.java      | 15 ++++++++++++---
 .../java/org/apache/pulsar/broker/PulsarService.java  | 12 ++++++------
 .../loadbalance/impl/ModularLoadManagerImpl.java      |  2 +-
 .../org/apache/pulsar/compaction/CompactorTool.java   |  8 ++++----
 .../org/apache/pulsar/broker/PulsarServiceTest.java   | 19 ++++++++++++-------
 6 files changed, 41 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index fe23e6b..6ce69c5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -54,9 +54,13 @@ public class ServiceConfigurationUtils {
      * Get the address of Broker, first try to get it from AdvertisedAddress.
      * If it is not set, try to get the address set by advertisedListener.
      * If it is still not set, get it through InetAddress.getLocalHost().
+     * @param configuration
+     * @param ignoreAdvertisedListener Sometimes we can’t use the default key of AdvertisedListener,
+     *                                 setting it to true can ignore AdvertisedListener.
      * @return
      */
-    public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration) {
+    public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration,
+                                                     boolean ignoreAdvertisedListener) {
         Map<String, AdvertisedListener> result = MultipleListenerValidator
                 .validateAndAnalysisAdvertisedListener(configuration);
 
@@ -66,7 +70,7 @@ public class ServiceConfigurationUtils {
         }
 
         AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName());
-        if (advertisedListener != null) {
+        if (advertisedListener != null && !ignoreAdvertisedListener) {
             String address = advertisedListener.getBrokerServiceUrl().getHost();
             if (address != null) {
                 return address;
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
index 8928e82..f41ed92 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.validator;
 
+import java.net.InetAddress;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.testng.annotations.Test;
@@ -46,15 +47,23 @@ public class MultipleListenerValidatorTest {
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651");
         config.setInternalListenerName("internal");
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.1");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                "192.0.0.1");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+                InetAddress.getLocalHost().getCanonicalHostName());
 
         config = new ServiceConfiguration();
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedAddress("192.0.0.2");
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.2");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                "192.0.0.2");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+                "192.0.0.2");
 
         config.setAdvertisedAddress(null);
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                 ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index be85f54..cbd3eb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -280,7 +280,7 @@ public class PulsarService implements AutoCloseable {
         PulsarConfigurationLoader.isComplete(config);
         // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
         this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
-        this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config);
+        this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false);
         state = State.Init;
         // use `internalListenerName` listener as `advertisedAddress`
         this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
@@ -1361,7 +1361,7 @@ public class PulsarService implements AutoCloseable {
 
     protected String brokerUrl(ServiceConfiguration config) {
         if (config.getBrokerServicePort().isPresent()) {
-            return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getBrokerListenPort().get());
         } else {
             return null;
@@ -1374,7 +1374,7 @@ public class PulsarService implements AutoCloseable {
 
     public String brokerUrlTls(ServiceConfiguration config) {
         if (config.getBrokerServicePortTls().isPresent()) {
-            return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getBrokerListenPortTls().get());
         } else {
             return null;
@@ -1387,7 +1387,7 @@ public class PulsarService implements AutoCloseable {
 
     public String webAddress(ServiceConfiguration config) {
         if (config.getWebServicePort().isPresent()) {
-            return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getListenPortHTTP().get());
         } else {
             return null;
@@ -1400,7 +1400,7 @@ public class PulsarService implements AutoCloseable {
 
     public String webAddressTls(ServiceConfiguration config) {
         if (config.getWebServicePortTls().isPresent()) {
-            return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getListenPortHTTPS().get());
         } else {
             return null;
@@ -1546,7 +1546,7 @@ public class PulsarService implements AutoCloseable {
 
         // worker talks to local broker
         String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
-                ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig));
+                brokerConfig.getAdvertisedAddress());
         workerConfig.setWorkerHostname(hostname);
         workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
         // inherit broker authorization setting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 6fd2884..5b7867e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -931,7 +931,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
         List<Metrics> metrics = Lists.newArrayList();
         Map<String, String> dimensions = new HashMap<>();
 
-        dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf));
+        dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true));
         dimensions.put("metric", "loadBalancing");
 
         Metrics m = Metrics.create(dimensions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index fe7a5b8..ee2b374 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -104,15 +104,15 @@ public class CompactorTool {
             log.info("Found `brokerServicePortTls` in configuration file. \n"
                     + "Will connect pulsar use TLS.");
             clientBuilder
-                    .serviceUrl(PulsarService.brokerUrlTls(ServiceConfigurationUtils
-                                    .getAppliedAdvertisedAddress(brokerConfig),
+                    .serviceUrl(PulsarService.brokerUrlTls(
+                            ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
                             brokerConfig.getBrokerServicePortTls().get()))
                     .allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
                     .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
 
         } else {
-            clientBuilder.serviceUrl(PulsarService.brokerUrl(ServiceConfigurationUtils
-                            .getAppliedAdvertisedAddress(brokerConfig),
+            clientBuilder.serviceUrl(PulsarService.brokerUrl(
+                    ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
                     brokerConfig.getBrokerServicePort().get()));
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 9da4aa7..1f80221 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -106,17 +106,22 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testAppliedAdvertised() throws Exception {
         useListenerName = true;
-        conf.setAdvertisedListeners("internal:pulsar://127.0.0.1, internal:pulsar+ssl://127.0.0.1");
+        conf.setAdvertisedListeners("internal:pulsar://127.0.0.1:6650, internal:pulsar+ssl://127.0.0.1:6651");
         conf.setInternalListenerName("internal");
         setup();
-
-        AssertJUnit.assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
+        assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
         assertNull(pulsar.getConfiguration().getAdvertisedAddress());
         assertEquals(conf, pulsar.getConfiguration());
-        assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://127.0.0.1:6651");
-        assertEquals(pulsar.brokerUrl(conf), "pulsar://127.0.0.1:6660");
-        assertEquals(pulsar.webAddress(conf), "http://127.0.0.1:8081");
-        assertEquals(pulsar.webAddressTls(conf), "https://127.0.0.1:8082");
+
+        cleanup();
+        resetConfig();
+        setup();
+        assertEquals(pulsar.getAdvertisedAddress(), "localhost");
+        assertEquals(conf, pulsar.getConfiguration());
+        assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://localhost:" + pulsar.getBrokerListenPortTls().get());
+        assertEquals(pulsar.brokerUrl(conf), "pulsar://localhost:" + pulsar.getBrokerListenPort().get());
+        assertEquals(pulsar.webAddress(conf), "http://localhost:" + pulsar.getWebService().getListenPortHTTP().get());
+        assertEquals(pulsar.webAddressTls(conf), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
     }
 
 }