You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/28 17:17:25 UTC
[pulsar] branch branch-2.9 updated: Support advertised listeners for HTTP and HTTPS services (#14839)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ff2fc19 Support advertised listeners for HTTP and HTTPS services (#14839)
ff2fc19 is described below
commit ff2fc19c03bf949cb8f9b58dc1e52b7c4cf937ef
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 28 08:56:05 2022 -0700
Support advertised listeners for HTTP and HTTPS services (#14839)
---
.../pulsar/broker/ServiceConfigurationUtils.java | 12 ++-
.../validator/MultipleListenerValidator.java | 26 ++++-
.../org/apache/pulsar/broker/PulsarService.java | 14 ++-
.../apache/pulsar/compaction/CompactorTool.java | 3 +-
.../loadbalance/AdvertisedListenersTest.java | 119 +++++++++++++++++++++
.../data/loadbalancer/AdvertisedListener.java | 25 +++++
6 files changed, 190 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index c47b8ca..7507181 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -85,10 +85,20 @@ public class ServiceConfigurationUtils {
* Gets the internal advertised listener for broker-to-broker communication.
* @return a non-null advertised listener
*/
- public static AdvertisedListener getInternalListener(ServiceConfiguration config) {
+ public static AdvertisedListener getInternalListener(ServiceConfiguration config, String protocol) {
Map<String, AdvertisedListener> result = MultipleListenerValidator
.validateAndAnalysisAdvertisedListener(config);
AdvertisedListener internal = result.get(config.getInternalListenerName());
+ if (internal == null || !internal.hasUriForProtocol(protocol)) {
+ // Search for an advertised listener for same protocol
+ for (AdvertisedListener l : result.values()) {
+ if (l.hasUriForProtocol(protocol)) {
+ internal = l;
+ break;
+ }
+ }
+ }
+
if (internal == null) {
// synthesize an advertised listener based on legacy configuration properties
String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
index ce02da9..aa5fdd6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -79,7 +79,7 @@ public final class MultipleListenerValidator {
if (entry.getValue().size() > 2) {
throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
}
- URI pulsarAddress = null, pulsarSslAddress = null;
+ URI pulsarAddress = null, pulsarSslAddress = null, pulsarHttpAddress = null, pulsarHttpsAddress = null;
for (final String strUri : entry.getValue()) {
try {
URI uri = URI.create(strUri);
@@ -95,7 +95,22 @@ public final class MultipleListenerValidator {
} else {
throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
}
+ } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "http")) {
+ if (pulsarHttpAddress == null) {
+ pulsarHttpAddress = uri;
+ } else {
+ throw new IllegalArgumentException("there are redundant configure for listener `"
+ + entry.getKey() + "`");
+ }
+ } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "https")) {
+ if (pulsarHttpsAddress == null) {
+ pulsarHttpsAddress = uri;
+ } else {
+ throw new IllegalArgumentException("there are redundant configure for listener `"
+ + entry.getKey() + "`");
+ }
}
+
String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
sets.add(entry.getKey());
@@ -103,10 +118,15 @@ public final class MultipleListenerValidator {
throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
}
} catch (Throwable cause) {
- throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
+ throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid", cause);
}
}
- result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
+ result.put(entry.getKey(), AdvertisedListener.builder()
+ .brokerServiceUrl(pulsarAddress)
+ .brokerServiceUrlTls(pulsarSslAddress)
+ .brokerHttpUrl(pulsarHttpAddress)
+ .brokerHttpsUrl(pulsarHttpsAddress)
+ .build());
}
return result;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 56d8b2b..46884d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1423,7 +1423,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
* Gets the broker service URL (non-TLS) associated with the internal listener.
*/
protected String brokerUrl(ServiceConfiguration config) {
- AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
+ AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar");
return internalListener.getBrokerServiceUrl() != null
? internalListener.getBrokerServiceUrl().toString() : null;
}
@@ -1436,7 +1436,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
* Gets the broker service URL (TLS) associated with the internal listener.
*/
public String brokerUrlTls(ServiceConfiguration config) {
- AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
+ AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar+ssl");
return internalListener.getBrokerServiceUrlTls() != null
? internalListener.getBrokerServiceUrlTls().toString() : null;
}
@@ -1447,7 +1447,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
public String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
- return webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
+ AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
+ return internalListener.getBrokerHttpUrl() != null
+ ? internalListener.getBrokerHttpUrl().toString()
+ : webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
} else {
return null;
}
@@ -1459,7 +1462,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
public String webAddressTls(ServiceConfiguration config) {
if (config.getWebServicePortTls().isPresent()) {
- return webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
+ AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
+ return internalListener.getBrokerHttpsUrl() != null
+ ? internalListener.getBrokerHttpsUrl().toString()
+ : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
} else {
return null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 1140c49..ac028ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -110,7 +110,7 @@ public class CompactorTool {
brokerConfig.getBrokerClientAuthenticationParameters());
}
- AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig);
+ AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
if (internalListener.getBrokerServiceUrlTls() != null) {
log.info("Found a TLS-based advertised listener in configuration file. \n"
+ "Will connect pulsar use TLS.");
@@ -120,6 +120,7 @@ public class CompactorTool {
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
} else {
+ internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
new file mode 100644
index 0000000..6ff4967
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertEquals;
+import java.net.URI;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.lookup.data.LookupData;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class AdvertisedListenersTest extends MultiBrokerBaseTest {
+ @Override
+ protected int numberOfAdditionalBrokers() {
+ return 1;
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+
+ updateConfig(conf, "BROKER-X");
+ }
+
+ @Override
+ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
+ ServiceConfiguration conf = super.createConfForAdditionalBroker(additionalBrokerIndex);
+ updateConfig(conf, "BROKER-" + additionalBrokerIndex);
+ return conf;
+ }
+
+ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
+ int pulsarPort = PortManager.nextFreePort();
+ int httpPort = PortManager.nextFreePort();
+ int httpsPort = PortManager.nextFreePort();
+
+ // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
+ this.conf.setAdvertisedAddress(advertisedAddress);
+ this.conf.setAdvertisedListeners(
+ "public:pulsar://localhost:" + pulsarPort +
+ ",public_http:http://localhost:" + httpPort +
+ ",public_https:https://localhost:" + httpsPort);
+ this.conf.setBrokerServicePort(Optional.of(pulsarPort));
+ this.conf.setWebServicePort(Optional.of(httpPort));
+ this.conf.setWebServicePortTls(Optional.of(httpsPort));
+ }
+
+ @Test
+ public void testLookup() throws Exception {
+ HttpGet request =
+ new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
+ request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+ request.addHeader(HttpHeaders.ACCEPT, "application/json");
+
+ @Cleanup
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+
+ @Cleanup
+ CloseableHttpResponse response = httpClient.execute(request);
+
+ HttpEntity entity = response.getEntity();
+ LookupData ld = ObjectMapperFactory.getThreadLocal().readValue(EntityUtils.toString(entity), LookupData.class);
+ System.err.println("Lookup data: " + ld);
+
+ assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
+ assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
+ assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");
+
+
+ // Produce data
+ @Cleanup
+ Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+ .topic("my-topic")
+ .create();
+
+ p.send("hello");
+
+ // Verify we can get the correct HTTP redirect to the advertised listener
+ for (PulsarAdmin a : getAllAdmins()) {
+ TopicStats s = a.topics().getStats("my-topic");
+ assertEquals(s.getPublishers().size(), 1);
+ }
+ }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
index a310974f..b73fdab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java
@@ -44,4 +44,29 @@ public class AdvertisedListener {
@Setter
// the broker service uri with ssl
private URI brokerServiceUrlTls;
+
+ //
+ @Getter
+ @Setter
+ // the broker service uri without ssl
+ private URI brokerHttpUrl;
+ //
+ @Getter
+ @Setter
+ // the broker service uri with ssl
+ private URI brokerHttpsUrl;
+
+ public boolean hasUriForProtocol(String protocol) {
+ if ("pulsar".equals(protocol)) {
+ return brokerServiceUrl != null;
+ } else if ("pulsar+ssl".equals(protocol)) {
+ return brokerServiceUrlTls != null;
+ } else if ("http".equals(protocol)) {
+ return brokerHttpUrl != null;
+ } else if ("https".equals(protocol)) {
+ return brokerHttpsUrl != null;
+ } else {
+ return false;
+ }
+ }
}