You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/10/25 01:27:47 UTC

[pulsar] branch master updated: Adding option to use TLS in broker admin client. (#2831)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f0e9bee  Adding option to use TLS in broker admin client. (#2831)
f0e9bee is described below

commit f0e9bee7cf8347e663f149ba7fe746f9bbd00cd9
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Wed Oct 24 18:27:42 2018 -0700

    Adding option to use TLS in broker admin client. (#2831)
---
 conf/broker.conf                                   |   7 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  19 ++-
 .../org/apache/pulsar/broker/PulsarService.java    |  18 ++-
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../broker/admin/BrokerAdminClientTlsAuthTest.java | 164 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTlsTest.java   |   6 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |   3 +-
 7 files changed, 202 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 867dcb3..ad6135f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -263,6 +263,7 @@ superUserRoles=
 
 # Authentication settings of the broker itself. Used when the broker connects to other brokers,
 # either in same or other clusters
+brokerClientTlsEnabled=false
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 brokerClientTrustCertsFilePath=
@@ -466,9 +467,6 @@ replicationProducerQueueSize=1000
 # Replicator prefix used for replicator producer name and cursor name
 replicatorPrefix=pulsar.repl
 
-# Enable TLS when talking with other clusters to replicate messages
-replicationTlsEnabled=false
-
 # Default message retention time
 defaultRetentionTimeInMinutes=0
 
@@ -573,3 +571,6 @@ gcsManagedLedgerOffloadServiceAccountKeyFile=
 
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
+
+# Deprecated - Enable TLS when talking with other clusters to replicate messages
+replicationTlsEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3ddd08c..1cef4ed 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -451,8 +451,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String replicatorPrefix = "pulsar.repl";
     // Replicator producer queue size;
     private int replicationProducerQueueSize = 1000;
-    // Enable TLS when talking with other clusters to replicate messages
+    // @deprecated - Use brokerClientTlsEnabled instead.
+    @Deprecated
     private boolean replicationTlsEnabled = false;
+    // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication)
+    private boolean brokerClientTlsEnabled = false;
 
     // Default message retention time
     private int defaultRetentionTimeInMinutes = 0;
@@ -1528,15 +1531,25 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public void setReplicationProducerQueueSize(int replicationProducerQueueSize) {
         this.replicationProducerQueueSize = replicationProducerQueueSize;
     }
-
+    
+    @Deprecated
     public boolean isReplicationTlsEnabled() {
         return replicationTlsEnabled;
     }
-
+    
+    @Deprecated
     public void setReplicationTlsEnabled(boolean replicationTlsEnabled) {
         this.replicationTlsEnabled = replicationTlsEnabled;
     }
 
+    public boolean isBrokerClientTlsEnabled() {
+        return brokerClientTlsEnabled || replicationTlsEnabled;
+    }
+
+    public void setBrokerClientTlsEnabled(boolean brokerClientTlsEnabled) {
+        this.brokerClientTlsEnabled = brokerClientTlsEnabled;
+    }
+
     public List<String> getBootstrapNamespaces() {
         return bootstrapNamespaces;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 63460c7..bd88da4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
@@ -793,12 +794,19 @@ public class PulsarService implements AutoCloseable {
     public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
         if (this.adminClient == null) {
             try {
-                String adminApiUrl = webAddress(config);
-                this.adminClient = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
+                ServiceConfiguration conf = this.getConfiguration();
+                String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webAddressTls(config) : webAddress(config);
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
                         .authentication( //
-                                this.getConfiguration().getBrokerClientAuthenticationPlugin(), //
-                                this.getConfiguration().getBrokerClientAuthenticationParameters()) //
-                        .build();
+                                conf.getBrokerClientAuthenticationPlugin(), //
+                                conf.getBrokerClientAuthenticationParameters());
+                
+                if (conf.isBrokerClientTlsEnabled()) {
+                    builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                    builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+                }
+                
+                this.adminClient = builder.build();
                 LOG.info("Admin api url: " + adminApiUrl);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9a5fc29..b20b213 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -548,7 +548,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                     clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                 }
-                if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
+                if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
                     clientBuilder
                             .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
                                     : data.getServiceUrlTls())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
new file mode 100644
index 0000000..2cfb009
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.NotAuthorizedException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AuthPolicies;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.SecurityUtility;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    private static String getTLSFile(String name) {
+        return String.format("./src/test/resources/authentication/tls-http/%s.pem", name);
+    }
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        buildConf(conf);
+        super.internalSetup();
+    }
+
+    private void buildConf(ServiceConfiguration conf) {
+        conf.setLoadBalancerEnabled(true);
+        conf.setTlsEnabled(true);
+        conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
+        conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
+        conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
+        conf.setAuthenticationEnabled(true);
+        conf.setSuperUserRoles(ImmutableSet.of("superproxy", "broker.pulsar.apache.org"));
+        conf.setAuthenticationProviders(
+                ImmutableSet.of("org.apache.pulsar.broker.authentication.AuthenticationProviderTls"));
+        conf.setAuthorizationEnabled(true);
+        conf.setBrokerClientTlsEnabled(true);
+        String str = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("broker.cert"), getTLSFile("broker.key-pk8"));
+        conf.setBrokerClientAuthenticationParameters(str);
+        conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationTls");
+        conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert"));
+        conf.setTlsAllowInsecureConnection(true); 
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    PulsarAdmin buildAdminClient(String user) throws Exception {
+        return PulsarAdmin.builder()
+            .allowTlsInsecureConnection(false)
+            .enableTlsHostnameVerification(false)
+            .serviceHttpUrl(brokerUrlTls.toString())
+            .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+                            String.format("tlsCertFile:%s,tlsKeyFile:%s",
+                                          getTLSFile(user + ".cert"), getTLSFile(user + ".key-pk8")))
+            .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
+    }
+
+    /**
+     * Test case => Use Multiple Brokers 
+     *           => Create a namespace with bundles distributed among these brokers.
+     *           => Use Tls as authPlugin for everything.
+     *           => Run list topics command
+     * @throws Exception 
+     */
+    @Test
+    public void testPersistentList() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        
+        /***** Start Broker 2 ******/
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setBrokerServicePort(PortManager.nextFreePort());
+        conf.setBrokerServicePortTls(PortManager.nextFreePort());
+        conf.setWebServicePort(PortManager.nextFreePort());
+        conf.setWebServicePortTls(PortManager.nextFreePort());
+        conf.setAdvertisedAddress("localhost");
+        conf.setClusterName(this.conf.getClusterName());
+        conf.setZookeeperServers("localhost:2181");
+        buildConf(conf);
+        PulsarService pulsar2 = startBroker(conf);
+
+        /***** Broker 2 Started *****/
+        try (PulsarAdmin admin = buildAdminClient("superproxy")) {
+            admin.tenants().createTenant("tenant",
+                                         new TenantInfo(ImmutableSet.of("admin"),
+                                                        ImmutableSet.of("test")));
+        }
+        try (PulsarAdmin admin = buildAdminClient("admin")) {
+            Policies policies = new Policies();
+            policies.bundles = new BundlesData(4);
+            policies.auth_policies.namespace_auth.put("admin", ImmutableSet.of(AuthAction.produce, AuthAction.consume));
+            policies.replication_clusters = ImmutableSet.of("test");
+            admin.namespaces().createNamespace("tenant/ns", policies);
+            try {
+                admin.persistentTopics().getList("tenant/ns");
+            } catch (PulsarAdminException ex) {
+                ex.printStackTrace();
+                fail("Should not have thrown an exception");
+            }
+        }
+        
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index 71fc1cb..f9d3153 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -35,9 +35,9 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
     @Override
     @BeforeClass
     void setup() throws Exception {
-        config1.setReplicationTlsEnabled(true);
-        config2.setReplicationTlsEnabled(true);
-        config3.setReplicationTlsEnabled(true);
+        config1.setBrokerClientTlsEnabled(true);
+        config2.setBrokerClientTlsEnabled(true);
+        config3.setBrokerClientTlsEnabled(true);
         super.setup();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 8a58321..7cf4843 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -126,7 +126,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
     }
 
     /**
-     * UsecaseL Multiple Broker => Lookup Redirection test
+     * Usecase Multiple Broker => Lookup Redirection test
      *
      * 1. Broker1 is a leader 2. Lookup request reaches to Broker2 which redirects to leader (Broker1) with
      * authoritative = false 3. Leader (Broker1) finds out least loaded broker as Broker2 and redirects request to
@@ -143,7 +143,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setBrokerServicePort(PortManager.nextFreePort());
         conf2.setBrokerServicePortTls(PortManager.nextFreePort());
-        conf2.setAdvertisedAddress("localhost");
         conf2.setWebServicePort(PortManager.nextFreePort());
         conf2.setWebServicePortTls(PortManager.nextFreePort());
         conf2.setAdvertisedAddress("localhost");