You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/28 17:17:25 UTC

[pulsar] branch branch-2.9 updated: Support advertised listeners for HTTP and HTTPS services (#14839)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new ff2fc19  Support advertised listeners for HTTP and HTTPS services (#14839)
ff2fc19 is described below

commit ff2fc19c03bf949cb8f9b58dc1e52b7c4cf937ef
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 28 08:56:05 2022 -0700

    Support advertised listeners for HTTP and HTTPS services (#14839)
---
 .../pulsar/broker/ServiceConfigurationUtils.java   |  12 ++-
 .../validator/MultipleListenerValidator.java       |  26 ++++-
 .../org/apache/pulsar/broker/PulsarService.java    |  14 ++-
 .../apache/pulsar/compaction/CompactorTool.java    |   3 +-
 .../loadbalance/AdvertisedListenersTest.java       | 119 +++++++++++++++++++++
 .../data/loadbalancer/AdvertisedListener.java      |  25 +++++
 6 files changed, 190 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index c47b8ca..7507181 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -85,10 +85,20 @@ public class ServiceConfigurationUtils {
      * Gets the internal advertised listener for broker-to-broker communication.
      * @return a non-null advertised listener
      */
-    public static AdvertisedListener getInternalListener(ServiceConfiguration config) {
+    public static AdvertisedListener getInternalListener(ServiceConfiguration config, String protocol) {
         Map<String, AdvertisedListener> result = MultipleListenerValidator
                 .validateAndAnalysisAdvertisedListener(config);
         AdvertisedListener internal = result.get(config.getInternalListenerName());
+        if (internal == null || !internal.hasUriForProtocol(protocol)) {
+            // Search for an advertised listener for same protocol
+            for (AdvertisedListener l : result.values()) {
+                if (l.hasUriForProtocol(protocol)) {
+                    internal = l;
+                    break;
+                }
+            }
+        }
+
         if (internal == null) {
             // synthesize an advertised listener based on legacy configuration properties
             String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
index ce02da9..aa5fdd6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -79,7 +79,7 @@ public final class MultipleListenerValidator {
             if (entry.getValue().size() > 2) {
                 throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
             }
-            URI pulsarAddress = null, pulsarSslAddress = null;
+            URI pulsarAddress = null, pulsarSslAddress = null, pulsarHttpAddress = null, pulsarHttpsAddress = null;
             for (final String strUri : entry.getValue()) {
                 try {
                     URI uri = URI.create(strUri);
@@ -95,7 +95,22 @@ public final class MultipleListenerValidator {
                         } else {
                             throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
                         }
+                    } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "http")) {
+                        if (pulsarHttpAddress == null) {
+                            pulsarHttpAddress = uri;
+                        } else {
+                            throw new IllegalArgumentException("there are redundant configure for listener `"
+                                    + entry.getKey() + "`");
+                        }
+                    } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "https")) {
+                        if (pulsarHttpsAddress == null) {
+                            pulsarHttpsAddress = uri;
+                        } else {
+                            throw new IllegalArgumentException("there are redundant configure for listener `"
+                                    + entry.getKey() + "`");
+                        }
                     }
+
                     String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
                     Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
                     sets.add(entry.getKey());
@@ -103,10 +118,15 @@ public final class MultipleListenerValidator {
                         throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
                     }
                 } catch (Throwable cause) {
-                    throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
+                    throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid", cause);
                 }
             }
-            result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
+            result.put(entry.getKey(), AdvertisedListener.builder()
+                    .brokerServiceUrl(pulsarAddress)
+                    .brokerServiceUrlTls(pulsarSslAddress)
+                    .brokerHttpUrl(pulsarHttpAddress)
+                    .brokerHttpsUrl(pulsarHttpsAddress)
+                    .build());
         }
         return result;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 56d8b2b..46884d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1423,7 +1423,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
      * Gets the broker service URL (non-TLS) associated with the internal listener.
      */
     protected String brokerUrl(ServiceConfiguration config) {
-        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
+        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar");
         return internalListener.getBrokerServiceUrl() != null
                 ? internalListener.getBrokerServiceUrl().toString() : null;
     }
@@ -1436,7 +1436,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
      * Gets the broker service URL (TLS) associated with the internal listener.
      */
     public String brokerUrlTls(ServiceConfiguration config) {
-        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
+        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar+ssl");
         return internalListener.getBrokerServiceUrlTls() != null
                 ? internalListener.getBrokerServiceUrlTls().toString() : null;
     }
@@ -1447,7 +1447,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     public String webAddress(ServiceConfiguration config) {
         if (config.getWebServicePort().isPresent()) {
-            return webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
+            AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
+            return internalListener.getBrokerHttpUrl() != null
+                    ? internalListener.getBrokerHttpUrl().toString()
+                    : webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
         } else {
             return null;
         }
@@ -1459,7 +1462,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     public String webAddressTls(ServiceConfiguration config) {
         if (config.getWebServicePortTls().isPresent()) {
-            return webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
+            AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
+            return internalListener.getBrokerHttpsUrl() != null
+                    ? internalListener.getBrokerHttpsUrl().toString()
+                    : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
         } else {
             return null;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 1140c49..ac028ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -110,7 +110,7 @@ public class CompactorTool {
                     brokerConfig.getBrokerClientAuthenticationParameters());
         }
 
-        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig);
+        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
         if (internalListener.getBrokerServiceUrlTls() != null) {
             log.info("Found a TLS-based advertised listener in configuration file. \n"
                     + "Will connect pulsar use TLS.");
@@ -120,6 +120,7 @@ public class CompactorTool {
                     .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
 
         } else {
+            internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
             clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
new file mode 100644
index 0000000..6ff4967
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertEquals;
+import java.net.URI;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.lookup.data.LookupData;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class AdvertisedListenersTest extends MultiBrokerBaseTest {
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 1;
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        updateConfig(conf, "BROKER-X");
+    }
+
+    @Override
+    protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
+        ServiceConfiguration conf = super.createConfForAdditionalBroker(additionalBrokerIndex);
+        updateConfig(conf, "BROKER-" + additionalBrokerIndex);
+        return conf;
+    }
+
+    private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
+        int pulsarPort = PortManager.nextFreePort();
+        int httpPort = PortManager.nextFreePort();
+        int httpsPort = PortManager.nextFreePort();
+
+        // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
+        this.conf.setAdvertisedAddress(advertisedAddress);
+        this.conf.setAdvertisedListeners(
+                "public:pulsar://localhost:" + pulsarPort +
+                        ",public_http:http://localhost:" + httpPort +
+                        ",public_https:https://localhost:" + httpsPort);
+        this.conf.setBrokerServicePort(Optional.of(pulsarPort));
+        this.conf.setWebServicePort(Optional.of(httpPort));
+        this.conf.setWebServicePortTls(Optional.of(httpsPort));
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        HttpGet request =
+                new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
+        request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+        request.addHeader(HttpHeaders.ACCEPT, "application/json");
+
+        @Cleanup
+        CloseableHttpClient httpClient = HttpClients.createDefault();
+
+        @Cleanup
+        CloseableHttpResponse response = httpClient.execute(request);
+
+        HttpEntity entity = response.getEntity();
+        LookupData ld = ObjectMapperFactory.getThreadLocal().readValue(EntityUtils.toString(entity), LookupData.class);
+        System.err.println("Lookup data: " + ld);
+
+        assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
+        assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
+        assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");
+
+
+        // Produce data
+        @Cleanup
+        Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-topic")
+                .create();
+
+        p.send("hello");
+
+        // Verify we can get the correct HTTP redirect to the advertised listener
+        for (PulsarAdmin a : getAllAdmins()) {
+            TopicStats s = a.topics().getStats("my-topic");
+            assertEquals(s.getPublishers().size(), 1);
+        }
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
index a310974f..b73fdab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
@@ -44,4 +44,29 @@ public class AdvertisedListener {
     @Setter
     // the broker service uri with ssl
     private URI brokerServiceUrlTls;
+
+    //
+    @Getter
+    @Setter
+    // the broker service uri without ssl
+    private URI brokerHttpUrl;
+    //
+    @Getter
+    @Setter
+    // the broker service uri with ssl
+    private URI brokerHttpsUrl;
+
+    public boolean hasUriForProtocol(String protocol) {
+        if ("pulsar".equals(protocol)) {
+            return brokerServiceUrl != null;
+        } else if ("pulsar+ssl".equals(protocol)) {
+            return brokerServiceUrlTls != null;
+        } else if ("http".equals(protocol)) {
+            return brokerHttpUrl != null;
+        } else if ("https".equals(protocol)) {
+            return brokerHttpsUrl != null;
+        } else {
+            return false;
+        }
+    }
 }