You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/10/25 01:27:47 UTC
[pulsar] branch master updated: Adding option to use TLS in broker
admin client. (#2831)
This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f0e9bee Adding option to use TLS in broker admin client. (#2831)
f0e9bee is described below
commit f0e9bee7cf8347e663f149ba7fe746f9bbd00cd9
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Wed Oct 24 18:27:42 2018 -0700
Adding option to use TLS in broker admin client. (#2831)
---
conf/broker.conf | 7 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 19 ++-
.../org/apache/pulsar/broker/PulsarService.java | 18 ++-
.../pulsar/broker/service/BrokerService.java | 2 +-
.../broker/admin/BrokerAdminClientTlsAuthTest.java | 164 +++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTlsTest.java | 6 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 3 +-
7 files changed, 202 insertions(+), 17 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 867dcb3..ad6135f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -263,6 +263,7 @@ superUserRoles=
# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
+brokerClientTlsEnabled=false
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=
@@ -466,9 +467,6 @@ replicationProducerQueueSize=1000
# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl
-# Enable TLS when talking with other clusters to replicate messages
-replicationTlsEnabled=false
-
# Default message retention time
defaultRetentionTimeInMinutes=0
@@ -573,3 +571,6 @@ gcsManagedLedgerOffloadServiceAccountKeyFile=
# Deprecated. Use configurationStoreServers
globalZookeeperServers=
+
+# Deprecated - Enable TLS when talking with other clusters to replicate messages
+replicationTlsEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3ddd08c..1cef4ed 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -451,8 +451,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String replicatorPrefix = "pulsar.repl";
// Replicator producer queue size;
private int replicationProducerQueueSize = 1000;
- // Enable TLS when talking with other clusters to replicate messages
+ // @deprecated - Use brokerClientTlsEnabled instead.
+ @Deprecated
private boolean replicationTlsEnabled = false;
+ // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication)
+ private boolean brokerClientTlsEnabled = false;
// Default message retention time
private int defaultRetentionTimeInMinutes = 0;
@@ -1528,15 +1531,25 @@ public class ServiceConfiguration implements PulsarConfiguration {
public void setReplicationProducerQueueSize(int replicationProducerQueueSize) {
this.replicationProducerQueueSize = replicationProducerQueueSize;
}
-
+
+ @Deprecated
public boolean isReplicationTlsEnabled() {
return replicationTlsEnabled;
}
-
+
+ @Deprecated
public void setReplicationTlsEnabled(boolean replicationTlsEnabled) {
this.replicationTlsEnabled = replicationTlsEnabled;
}
+ public boolean isBrokerClientTlsEnabled() {
+ return brokerClientTlsEnabled || replicationTlsEnabled;
+ }
+
+ public void setBrokerClientTlsEnabled(boolean brokerClientTlsEnabled) {
+ this.brokerClientTlsEnabled = brokerClientTlsEnabled;
+ }
+
public List<String> getBootstrapNamespaces() {
return bootstrapNamespaces;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 63460c7..bd88da4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.conf.InternalConfigurationData;
@@ -793,12 +794,19 @@ public class PulsarService implements AutoCloseable {
public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
if (this.adminClient == null) {
try {
- String adminApiUrl = webAddress(config);
- this.adminClient = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
+ ServiceConfiguration conf = this.getConfiguration();
+ String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webAddressTls(config) : webAddress(config);
+ PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
.authentication( //
- this.getConfiguration().getBrokerClientAuthenticationPlugin(), //
- this.getConfiguration().getBrokerClientAuthenticationParameters()) //
- .build();
+ conf.getBrokerClientAuthenticationPlugin(), //
+ conf.getBrokerClientAuthenticationParameters());
+
+ if (conf.isBrokerClientTlsEnabled()) {
+ builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+ builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+ }
+
+ this.adminClient = builder.build();
LOG.info("Admin api url: " + adminApiUrl);
} catch (Exception e) {
throw new PulsarServerException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9a5fc29..b20b213 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -548,7 +548,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
- if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
+ if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
clientBuilder
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
new file mode 100644
index 0000000..2cfb009
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.NotAuthorizedException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AuthPolicies;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.SecurityUtility;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
+ protected String methodName;
+
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ methodName = m.getName();
+ }
+
+ private static String getTLSFile(String name) {
+ return String.format("./src/test/resources/authentication/tls-http/%s.pem", name);
+ }
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ buildConf(conf);
+ super.internalSetup();
+ }
+
+ private void buildConf(ServiceConfiguration conf) {
+ conf.setLoadBalancerEnabled(true);
+ conf.setTlsEnabled(true);
+ conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
+ conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
+ conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
+ conf.setAuthenticationEnabled(true);
+ conf.setSuperUserRoles(ImmutableSet.of("superproxy", "broker.pulsar.apache.org"));
+ conf.setAuthenticationProviders(
+ ImmutableSet.of("org.apache.pulsar.broker.authentication.AuthenticationProviderTls"));
+ conf.setAuthorizationEnabled(true);
+ conf.setBrokerClientTlsEnabled(true);
+ String str = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("broker.cert"), getTLSFile("broker.key-pk8"));
+ conf.setBrokerClientAuthenticationParameters(str);
+ conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationTls");
+ conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert"));
+ conf.setTlsAllowInsecureConnection(true);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ PulsarAdmin buildAdminClient(String user) throws Exception {
+ return PulsarAdmin.builder()
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .serviceHttpUrl(brokerUrlTls.toString())
+ .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+ String.format("tlsCertFile:%s,tlsKeyFile:%s",
+ getTLSFile(user + ".cert"), getTLSFile(user + ".key-pk8")))
+ .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
+ }
+
+ /**
+ * Test case => Use Multiple Brokers
+ * => Create a namespace with bundles distributed among these brokers.
+ * => Use Tls as authPlugin for everything.
+ * => Run list topics command
+ * @throws Exception
+ */
+ @Test
+ public void testPersistentList() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ /***** Start Broker 2 ******/
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setBrokerServicePort(PortManager.nextFreePort());
+ conf.setBrokerServicePortTls(PortManager.nextFreePort());
+ conf.setWebServicePort(PortManager.nextFreePort());
+ conf.setWebServicePortTls(PortManager.nextFreePort());
+ conf.setAdvertisedAddress("localhost");
+ conf.setClusterName(this.conf.getClusterName());
+ conf.setZookeeperServers("localhost:2181");
+ buildConf(conf);
+ PulsarService pulsar2 = startBroker(conf);
+
+ /***** Broker 2 Started *****/
+ try (PulsarAdmin admin = buildAdminClient("superproxy")) {
+ admin.tenants().createTenant("tenant",
+ new TenantInfo(ImmutableSet.of("admin"),
+ ImmutableSet.of("test")));
+ }
+ try (PulsarAdmin admin = buildAdminClient("admin")) {
+ Policies policies = new Policies();
+ policies.bundles = new BundlesData(4);
+ policies.auth_policies.namespace_auth.put("admin", ImmutableSet.of(AuthAction.produce, AuthAction.consume));
+ policies.replication_clusters = ImmutableSet.of("test");
+ admin.namespaces().createNamespace("tenant/ns", policies);
+ try {
+ admin.persistentTopics().getList("tenant/ns");
+ } catch (PulsarAdminException ex) {
+ ex.printStackTrace();
+ fail("Should not have thrown an exception");
+ }
+ }
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index 71fc1cb..f9d3153 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -35,9 +35,9 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
@Override
@BeforeClass
void setup() throws Exception {
- config1.setReplicationTlsEnabled(true);
- config2.setReplicationTlsEnabled(true);
- config3.setReplicationTlsEnabled(true);
+ config1.setBrokerClientTlsEnabled(true);
+ config2.setBrokerClientTlsEnabled(true);
+ config3.setBrokerClientTlsEnabled(true);
super.setup();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 8a58321..7cf4843 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -126,7 +126,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
}
/**
- * UsecaseL Multiple Broker => Lookup Redirection test
+ * Usecase Multiple Broker => Lookup Redirection test
*
* 1. Broker1 is a leader 2. Lookup request reaches to Broker2 which redirects to leader (Broker1) with
* authoritative = false 3. Leader (Broker1) finds out least loaded broker as Broker2 and redirects request to
@@ -143,7 +143,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
- conf2.setAdvertisedAddress("localhost");
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");