You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/25 01:27:44 UTC

[GitHub] jai1 closed pull request #2831: Adding option to use TLS in broker admin client.

jai1 closed pull request #2831: Adding option to use TLS in broker admin client.
URL: https://github.com/apache/pulsar/pull/2831
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 867dcb3de5..ad6135f368 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -263,6 +263,7 @@ superUserRoles=
 
 # Authentication settings of the broker itself. Used when the broker connects to other brokers,
 # either in same or other clusters
+brokerClientTlsEnabled=false
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 brokerClientTrustCertsFilePath=
@@ -466,9 +467,6 @@ replicationProducerQueueSize=1000
 # Replicator prefix used for replicator producer name and cursor name
 replicatorPrefix=pulsar.repl
 
-# Enable TLS when talking with other clusters to replicate messages
-replicationTlsEnabled=false
-
 # Default message retention time
 defaultRetentionTimeInMinutes=0
 
@@ -573,3 +571,6 @@ gcsManagedLedgerOffloadServiceAccountKeyFile=
 
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
+
+# Deprecated - Enable TLS when talking with other clusters to replicate messages
+replicationTlsEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3ddd08c69d..1cef4edafe 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -451,8 +451,11 @@
     private String replicatorPrefix = "pulsar.repl";
     // Replicator producer queue size;
     private int replicationProducerQueueSize = 1000;
-    // Enable TLS when talking with other clusters to replicate messages
+    // @deprecated - Use brokerClientTlsEnabled instead.
+    @Deprecated
     private boolean replicationTlsEnabled = false;
+    // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication)
+    private boolean brokerClientTlsEnabled = false;
 
     // Default message retention time
     private int defaultRetentionTimeInMinutes = 0;
@@ -1528,15 +1531,25 @@ public int getReplicationProducerQueueSize() {
     public void setReplicationProducerQueueSize(int replicationProducerQueueSize) {
         this.replicationProducerQueueSize = replicationProducerQueueSize;
     }
-
+    
+    @Deprecated
     public boolean isReplicationTlsEnabled() {
         return replicationTlsEnabled;
     }
-
+    
+    @Deprecated
     public void setReplicationTlsEnabled(boolean replicationTlsEnabled) {
         this.replicationTlsEnabled = replicationTlsEnabled;
     }
 
+    public boolean isBrokerClientTlsEnabled() {
+        return brokerClientTlsEnabled || replicationTlsEnabled;
+    }
+
+    public void setBrokerClientTlsEnabled(boolean brokerClientTlsEnabled) {
+        this.brokerClientTlsEnabled = brokerClientTlsEnabled;
+    }
+
     public List<String> getBootstrapNamespaces() {
         return bootstrapNamespaces;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 63460c7634..bd88da450a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -74,6 +74,7 @@
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
@@ -793,12 +794,19 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
     public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
         if (this.adminClient == null) {
             try {
-                String adminApiUrl = webAddress(config);
-                this.adminClient = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
+                ServiceConfiguration conf = this.getConfiguration();
+                String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webAddressTls(config) : webAddress(config);
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
                         .authentication( //
-                                this.getConfiguration().getBrokerClientAuthenticationPlugin(), //
-                                this.getConfiguration().getBrokerClientAuthenticationParameters()) //
-                        .build();
+                                conf.getBrokerClientAuthenticationPlugin(), //
+                                conf.getBrokerClientAuthenticationParameters());
+                
+                if (conf.isBrokerClientTlsEnabled()) {
+                    builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                    builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+                }
+                
+                this.adminClient = builder.build();
                 LOG.info("Admin api url: " + adminApiUrl);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9a5fc29a8f..b20b213e77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -548,7 +548,7 @@ public PulsarClient getReplicationClient(String cluster) {
                     clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                 }
-                if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
+                if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
                     clientBuilder
                             .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
                                     : data.getServiceUrlTls())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
new file mode 100644
index 0000000000..2cfb009ac7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.NotAuthorizedException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AuthPolicies;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.SecurityUtility;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    private static String getTLSFile(String name) {
+        return String.format("./src/test/resources/authentication/tls-http/%s.pem", name);
+    }
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        buildConf(conf);
+        super.internalSetup();
+    }
+
+    private void buildConf(ServiceConfiguration conf) {
+        conf.setLoadBalancerEnabled(true);
+        conf.setTlsEnabled(true);
+        conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
+        conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
+        conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
+        conf.setAuthenticationEnabled(true);
+        conf.setSuperUserRoles(ImmutableSet.of("superproxy", "broker.pulsar.apache.org"));
+        conf.setAuthenticationProviders(
+                ImmutableSet.of("org.apache.pulsar.broker.authentication.AuthenticationProviderTls"));
+        conf.setAuthorizationEnabled(true);
+        conf.setBrokerClientTlsEnabled(true);
+        String str = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("broker.cert"), getTLSFile("broker.key-pk8"));
+        conf.setBrokerClientAuthenticationParameters(str);
+        conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationTls");
+        conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert"));
+        conf.setTlsAllowInsecureConnection(true); 
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    PulsarAdmin buildAdminClient(String user) throws Exception {
+        return PulsarAdmin.builder()
+            .allowTlsInsecureConnection(false)
+            .enableTlsHostnameVerification(false)
+            .serviceHttpUrl(brokerUrlTls.toString())
+            .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+                            String.format("tlsCertFile:%s,tlsKeyFile:%s",
+                                          getTLSFile(user + ".cert"), getTLSFile(user + ".key-pk8")))
+            .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
+    }
+
+    /**
+     * Test case => Use Multiple Brokers 
+     *           => Create a namespace with bundles distributed among these brokers.
+     *           => Use Tls as authPlugin for everything.
+     *           => Run list topics command
+     * @throws Exception 
+     */
+    @Test
+    public void testPersistentList() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        
+        /***** Start Broker 2 ******/
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setBrokerServicePort(PortManager.nextFreePort());
+        conf.setBrokerServicePortTls(PortManager.nextFreePort());
+        conf.setWebServicePort(PortManager.nextFreePort());
+        conf.setWebServicePortTls(PortManager.nextFreePort());
+        conf.setAdvertisedAddress("localhost");
+        conf.setClusterName(this.conf.getClusterName());
+        conf.setZookeeperServers("localhost:2181");
+        buildConf(conf);
+        PulsarService pulsar2 = startBroker(conf);
+
+        /***** Broker 2 Started *****/
+        try (PulsarAdmin admin = buildAdminClient("superproxy")) {
+            admin.tenants().createTenant("tenant",
+                                         new TenantInfo(ImmutableSet.of("admin"),
+                                                        ImmutableSet.of("test")));
+        }
+        try (PulsarAdmin admin = buildAdminClient("admin")) {
+            Policies policies = new Policies();
+            policies.bundles = new BundlesData(4);
+            policies.auth_policies.namespace_auth.put("admin", ImmutableSet.of(AuthAction.produce, AuthAction.consume));
+            policies.replication_clusters = ImmutableSet.of("test");
+            admin.namespaces().createNamespace("tenant/ns", policies);
+            try {
+                admin.persistentTopics().getList("tenant/ns");
+            } catch (PulsarAdminException ex) {
+                ex.printStackTrace();
+                fail("Should not have thrown an exception");
+            }
+        }
+        
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index 71fc1cb8b3..f9d315393a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -35,9 +35,9 @@
     @Override
     @BeforeClass
     void setup() throws Exception {
-        config1.setReplicationTlsEnabled(true);
-        config2.setReplicationTlsEnabled(true);
-        config3.setReplicationTlsEnabled(true);
+        config1.setBrokerClientTlsEnabled(true);
+        config2.setBrokerClientTlsEnabled(true);
+        config3.setBrokerClientTlsEnabled(true);
         super.setup();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 8a583217ef..7cf4843824 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -126,7 +126,7 @@ protected void cleanup() throws Exception {
     }
 
     /**
-     * UsecaseL Multiple Broker => Lookup Redirection test
+     * Usecase Multiple Broker => Lookup Redirection test
      *
      * 1. Broker1 is a leader 2. Lookup request reaches to Broker2 which redirects to leader (Broker1) with
      * authoritative = false 3. Leader (Broker1) finds out least loaded broker as Broker2 and redirects request to
@@ -143,7 +143,6 @@ public void testMultipleBrokerLookup() throws Exception {
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setBrokerServicePort(PortManager.nextFreePort());
         conf2.setBrokerServicePortTls(PortManager.nextFreePort());
-        conf2.setAdvertisedAddress("localhost");
         conf2.setWebServicePort(PortManager.nextFreePort());
         conf2.setWebServicePortTls(PortManager.nextFreePort());
         conf2.setAdvertisedAddress("localhost");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services