You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/08/19 08:37:16 UTC

[pulsar] branch master updated: [Broker] Support disabling non-TLS service ports (#11681)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 50b6e79  [Broker] Support disabling non-TLS service ports (#11681)
50b6e79 is described below

commit 50b6e79d7cc350efb2208b4aa89f684e133e31c0
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Aug 19 11:36:37 2021 +0300

    [Broker] Support disabling non-TLS service ports (#11681)
    
    * Support disabling non-tls service ports
    
    * Add docs for disabling non-TLS ports
    
    * Update site2/docs/security-tls-keystore.md
    
    Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
---
 .../org/apache/pulsar/broker/PulsarService.java    |  7 +++--
 .../pulsar/broker/loadbalance/NoopLoadManager.java | 10 +++++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  2 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  8 ++++--
 .../pulsar/broker/service/BrokerServiceTest.java   | 32 ++++++++++++++++++++++
 .../common/naming/ServiceConfigurationTest.java    |  9 ++++++
 .../functions/worker/PulsarWorkerService.java      | 17 ++++++++++--
 .../pulsar/websocket/service/ProxyServer.java      |  6 +++-
 site2/docs/security-tls-keystore.md                | 13 +++++++++
 9 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0c0cec7..6a10daf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1548,12 +1548,15 @@ public class PulsarService implements AutoCloseable {
                                     AuthorizationService authorizationService)
             throws Exception {
         if (functionWorkerService.isPresent()) {
-            if (workerConfig.isUseTls()) {
+            if (workerConfig.isUseTls() || brokerServiceUrl == null) {
                 workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
+            } else {
+                workerConfig.setPulsarServiceUrl(brokerServiceUrl);
+            }
+            if (workerConfig.isUseTls() || webServiceAddress == null) {
                 workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
                 workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
             } else {
-                workerConfig.setPulsarServiceUrl(brokerServiceUrl);
                 workerConfig.setPulsarWebServiceUrl(webServiceAddress);
                 workerConfig.setFunctionWebServiceUrl(webServiceAddress);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index db017c1..85071ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -51,8 +51,7 @@ public class NoopLoadManager implements LoadManager {
 
     @Override
     public void start() throws PulsarServerException {
-        lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
-                + pulsar.getConfiguration().getWebServicePort().get();
+        lookupServiceAddress = getBrokerAddress();
         localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
                 new PulsarResourceDescription());
 
@@ -71,6 +70,13 @@ public class NoopLoadManager implements LoadManager {
         }
     }
 
+    private String getBrokerAddress() {
+        return String.format("%s:%s", pulsar.getAdvertisedAddress(),
+                pulsar.getConfiguration().getWebServicePort().isPresent()
+                        ? pulsar.getConfiguration().getWebServicePort().get()
+                        : pulsar.getConfiguration().getWebServicePortTls().get());
+    }
+
     @Override
     public boolean isCentralized() {
         return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index a0d2898..5d743fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -1122,7 +1122,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
         return String.format("%s:%s", pulsar.getAdvertisedAddress(),
                 pulsar.getConfiguration().getWebServicePort().isPresent()
                         ? pulsar.getConfiguration().getWebServicePort().get()
-                        : pulsar.getConfiguration().getWebServicePortTls());
+                        : pulsar.getConfiguration().getWebServicePortTls().get());
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 37a7f3a..9009eec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -264,13 +264,15 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
         this.pulsar = startBroker(conf);
 
-        brokerUrl = new URL(pulsar.getWebServiceAddress());
-        brokerUrlTls = new URL(pulsar.getWebServiceAddressTls());
+        brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
+        brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;
 
         if (admin != null) {
             admin.close();
         }
-        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString());
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                ? brokerUrl.toString()
+                : brokerUrlTls.toString());
         customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
         admin = spy(pulsarAdminBuilder.build());
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9fee5e6..bff415f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -694,6 +694,38 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
+        final String topicName = "persistent://prop/ns-abc/newTopic";
+        final String subName = "newSub";
+
+        conf.setAuthenticationEnabled(false);
+        conf.setBrokerServicePort(Optional.empty());
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePort(Optional.empty());
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setNumExecutorThreadPoolSize(5);
+        restartBroker();
+
+        // Access with TLS (Allow insecure TLS connection)
+        try {
+            pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
+                    .allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
+                    .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+
+            @Cleanup
+            Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                    .subscribe();
+
+        } catch (Exception e) {
+            fail("should not fail");
+        } finally {
+            pulsarClient.close();
+        }
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testTlsAuthAllowInsecure() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 452ee96..078ad61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -79,6 +79,15 @@ public class ServiceConfigurationTest {
         assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5.0));
     }
 
+    @Test
+    public void testServicePortsEmpty() throws Exception {
+        String confFile = "brokerServicePort=\nwebServicePort=\n";
+        InputStream stream = new ByteArrayInputStream(confFile.getBytes());
+        final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+        assertEquals(config.getBrokerServicePort(), Optional.empty());
+        assertEquals(config.getWebServicePort(), Optional.empty());
+    }
+
     /**
      * test {@link ServiceConfiguration} with incorrect values.
      *
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 49e58da..f152adc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -141,7 +141,13 @@ public class PulsarWorkerService implements WorkerService {
                         workerConfig.isTlsAllowInsecureConnection(),
                         workerConfig.isTlsEnableHostnameVerification());
                 } else {
-                    return WorkerUtils.getPulsarAdminClient(pulsarServiceUrl);
+                    return WorkerUtils.getPulsarAdminClient(
+                            pulsarServiceUrl,
+                            null,
+                            null,
+                            null,
+                            workerConfig.isTlsAllowInsecureConnection(),
+                            workerConfig.isTlsEnableHostnameVerification());
                 }
             }
 
@@ -158,7 +164,14 @@ public class PulsarWorkerService implements WorkerService {
                         workerConfig.isTlsAllowInsecureConnection(),
                         workerConfig.isTlsEnableHostnameVerification());
                 } else {
-                    return WorkerUtils.getPulsarClient(pulsarServiceUrl);
+                    return WorkerUtils.getPulsarClient(
+                            pulsarServiceUrl,
+                            null,
+                            null,
+                            null,
+                            null,
+                            workerConfig.isTlsAllowInsecureConnection(),
+                            workerConfig.isTlsEnableHostnameVerification());
                 }
             }
         };
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index ae5fd8d..c616db0 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -22,10 +22,12 @@ import com.google.common.collect.Lists;
 
 import java.net.MalformedURLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.TimeZone;
 
+import java.util.stream.Collectors;
 import javax.servlet.Servlet;
 import javax.servlet.ServletException;
 import javax.websocket.DeploymentException;
@@ -120,7 +122,9 @@ public class ProxyServer {
     }
 
     public void start() throws PulsarServerException {
-        log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get());
+        log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors())
+                .map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
+                .collect(Collectors.joining(",")));
         RequestLogHandler requestLogHandler = new RequestLogHandler();
         Slf4jRequestLog requestLog = new Slf4jRequestLog();
         requestLog.setExtended(true);
diff --git a/site2/docs/security-tls-keystore.md b/site2/docs/security-tls-keystore.md
index 4f432aa..fd5a75a 100644
--- a/site2/docs/security-tls-keystore.md
+++ b/site2/docs/security-tls-keystore.md
@@ -131,6 +131,19 @@ brokerClientTlsTrustStorePassword=clientpw
 
 NOTE: it is important to restrict access to the store files via filesystem permissions.
 
+If you have configured TLS on the broker, to disable non-TLS ports, you can set the values of the following configurations to empty as below.
+```
+brokerServicePort=
+webServicePort=
+```
+In this case, you need to set the following configurations.
+
+```conf
+brokerClientTlsEnabled=true // Set this to true
+brokerClientTlsEnabledWithKeyStore=true  // Set this to true
+brokerClientTlsTrustStore= // Set this to your desired value
+brokerClientTlsTrustStorePassword= // Set this to your desired value
+
 Optional settings that may worth consider:
 
 1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end