You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 07:22:32 UTC
[pulsar] 01/07: [Broker] Support disabling non-TLS service ports
(#11681)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 39786572b168950679d26b0a945adf671e3f7cf4
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Aug 19 11:36:37 2021 +0300
[Broker] Support disabling non-TLS service ports (#11681)
* Support disabling non-tls service ports
* Add docs for disabling non-TLS ports
* Update site2/docs/security-tls-keystore.md
Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
(cherry picked from commit 50b6e79d7cc350efb2208b4aa89f684e133e31c0)
---
.../org/apache/pulsar/broker/PulsarService.java | 7 +++--
.../pulsar/broker/loadbalance/NoopLoadManager.java | 10 +++++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 8 ++++--
.../pulsar/broker/service/BrokerServiceTest.java | 32 ++++++++++++++++++++++
.../common/naming/ServiceConfigurationTest.java | 9 ++++++
.../functions/worker/PulsarWorkerService.java | 17 ++++++++++--
.../pulsar/websocket/service/ProxyServer.java | 6 +++-
site2/docs/security-tls-keystore.md | 13 +++++++++
9 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c09ec45..8d78d40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1541,12 +1541,15 @@ public class PulsarService implements AutoCloseable {
AuthorizationService authorizationService)
throws Exception {
if (functionWorkerService.isPresent()) {
- if (workerConfig.isUseTls()) {
+ if (workerConfig.isUseTls() || brokerServiceUrl == null) {
workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
+ } else {
+ workerConfig.setPulsarServiceUrl(brokerServiceUrl);
+ }
+ if (workerConfig.isUseTls() || webServiceAddress == null) {
workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
} else {
- workerConfig.setPulsarServiceUrl(brokerServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceAddress);
workerConfig.setFunctionWebServiceUrl(webServiceAddress);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index db017c1..85071ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -51,8 +51,7 @@ public class NoopLoadManager implements LoadManager {
@Override
public void start() throws PulsarServerException {
- lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
- + pulsar.getConfiguration().getWebServicePort().get();
+ lookupServiceAddress = getBrokerAddress();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
@@ -71,6 +70,13 @@ public class NoopLoadManager implements LoadManager {
}
}
+ private String getBrokerAddress() {
+ return String.format("%s:%s", pulsar.getAdvertisedAddress(),
+ pulsar.getConfiguration().getWebServicePort().isPresent()
+ ? pulsar.getConfiguration().getWebServicePort().get()
+ : pulsar.getConfiguration().getWebServicePortTls().get());
+ }
+
@Override
public boolean isCentralized() {
return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index fe5eddf..b14a018 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -1118,7 +1118,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
return String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().isPresent()
? pulsar.getConfiguration().getWebServicePort().get()
- : pulsar.getConfiguration().getWebServicePortTls());
+ : pulsar.getConfiguration().getWebServicePortTls().get());
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ddb76e8..fb23eb3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -264,13 +264,15 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
this.pulsar = startBroker(conf);
- brokerUrl = new URL(pulsar.getWebServiceAddress());
- brokerUrlTls = new URL(pulsar.getWebServiceAddressTls());
+ brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
+ brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;
if (admin != null) {
admin.close();
}
- PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString());
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+ ? brokerUrl.toString()
+ : brokerUrlTls.toString());
customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
admin = spy(pulsarAdminBuilder.build());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index a0e3a9e..ffd474f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -567,6 +567,38 @@ public class BrokerServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
+ final String topicName = "persistent://prop/ns-abc/newTopic";
+ final String subName = "newSub";
+
+ conf.setAuthenticationEnabled(false);
+ conf.setBrokerServicePort(Optional.empty());
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePort(Optional.empty());
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ conf.setNumExecutorThreadPoolSize(5);
+ restartBroker();
+
+ // Access with TLS (Allow insecure TLS connection)
+ try {
+ pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
+ .allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscribe();
+
+ } catch (Exception e) {
+ fail("should not fail");
+ } finally {
+ pulsarClient.close();
+ }
+ }
+
@SuppressWarnings("deprecation")
@Test
public void testTlsAuthAllowInsecure() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 6b7df2c..c5807e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -78,6 +78,15 @@ public class ServiceConfigurationTest {
assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5.0));
}
+ @Test
+ public void testServicePortsEmpty() throws Exception {
+ String confFile = "brokerServicePort=\nwebServicePort=\n";
+ InputStream stream = new ByteArrayInputStream(confFile.getBytes());
+ final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+ assertEquals(config.getBrokerServicePort(), Optional.empty());
+ assertEquals(config.getWebServicePort(), Optional.empty());
+ }
+
/**
* test {@link ServiceConfiguration} with incorrect values.
*
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 85c1152..93d4263 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -141,7 +141,13 @@ public class PulsarWorkerService implements WorkerService {
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
} else {
- return WorkerUtils.getPulsarAdminClient(pulsarServiceUrl);
+ return WorkerUtils.getPulsarAdminClient(
+ pulsarServiceUrl,
+ null,
+ null,
+ null,
+ workerConfig.isTlsAllowInsecureConnection(),
+ workerConfig.isTlsEnableHostnameVerification());
}
}
@@ -158,7 +164,14 @@ public class PulsarWorkerService implements WorkerService {
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
} else {
- return WorkerUtils.getPulsarClient(pulsarServiceUrl);
+ return WorkerUtils.getPulsarClient(
+ pulsarServiceUrl,
+ null,
+ null,
+ null,
+ null,
+ workerConfig.isTlsAllowInsecureConnection(),
+ workerConfig.isTlsEnableHostnameVerification());
}
}
};
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index ae5fd8d..c616db0 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -22,10 +22,12 @@ import com.google.common.collect.Lists;
import java.net.MalformedURLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
+import java.util.stream.Collectors;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
@@ -120,7 +122,9 @@ public class ProxyServer {
}
public void start() throws PulsarServerException {
- log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get());
+ log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors())
+ .map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
+ .collect(Collectors.joining(",")));
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
diff --git a/site2/docs/security-tls-keystore.md b/site2/docs/security-tls-keystore.md
index 4f432aa..fd5a75a 100644
--- a/site2/docs/security-tls-keystore.md
+++ b/site2/docs/security-tls-keystore.md
@@ -131,6 +131,19 @@ brokerClientTlsTrustStorePassword=clientpw
NOTE: it is important to restrict access to the store files via filesystem permissions.
+If you have configured TLS on the broker, to disable non-TLS ports, you can set the values of the following configurations to empty as below.
+```
+brokerServicePort=
+webServicePort=
+```
+In this case, you need to set the following configurations.
+
+```conf
+brokerClientTlsEnabled=true // Set this to true
+brokerClientTlsEnabledWithKeyStore=true // Set this to true
+brokerClientTlsTrustStore= // Set this to your desired value
+brokerClientTlsTrustStorePassword= // Set this to your desired value
+
Optional settings that may worth consider:
1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end