You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 07:56:17 UTC
[pulsar] 02/02: Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 50d95028a21e9284299b60704873a8daaa42c410
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Jun 1 02:14:49 2022 -0500
Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)
(cherry picked from commit aa673498f88d0ed4f9d5788a5036355834ea5119)
---
conf/broker.conf | 14 ++-
conf/functions_worker.yml | 20 ++++
conf/proxy.conf | 4 +
conf/websocket.conf | 4 +
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 16 ++-
.../org/apache/pulsar/broker/PulsarService.java | 29 ++++--
.../pulsar/broker/namespace/NamespaceService.java | 6 ++
.../pulsar/broker/service/BrokerService.java | 21 +++-
.../apache/pulsar/compaction/CompactorTool.java | 6 ++
...kerInternalClientConfigurationOverrideTest.java | 115 +++++++++++++++++++++
.../PulsarClientConfigurationOverrideTest.java | 56 ++++++++++
.../websocket/proxy/ProxyConfigurationTest.java | 6 ++
.../pulsar/client/admin/PulsarAdminBuilder.java | 23 +++++
.../admin/internal/PulsarAdminBuilderImpl.java | 9 +-
.../pulsar/client/internal/PropertiesUtils.java | 64 ++++++++++++
.../src/test/resources/test_worker_config.yml | 3 +
.../functions/worker/PulsarWorkerService.java | 12 ++-
.../pulsar/functions/worker/WorkerUtils.java | 45 +++++++-
.../pulsar/functions/worker/WorkerUtilsTest.java | 19 ++++
.../bookkeeper/BookKeeperPackagesStorage.java | 8 ++
.../BookKeeperPackagesStorageConfiguration.java | 4 +
.../core/PackagesStorageConfiguration.java | 6 ++
.../impl/DefaultPackagesStorageConfiguration.java | 5 +
.../pulsar/proxy/server/ProxyConnection.java | 15 ++-
.../apache/pulsar/websocket/WebSocketService.java | 7 +-
site2/docs/reference-configuration.md | 21 ++++
26 files changed, 504 insertions(+), 34 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 3d11ce2538a..db8b618fff8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -656,6 +656,9 @@ brokerClientTlsCiphers=
# used by the internal client to authenticate with Pulsar brokers
brokerClientTlsProtocols=
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
### --- Authentication --- ###
@@ -897,8 +900,11 @@ managedLedgerDefaultAckQuorum=2
# in case of lack of enough bookies
#bookkeeper_opportunisticStriping=false
-# you can add other configuration options for the BookKeeper client
-# by prefixing them with bookkeeper_
+# You can add other configuration options for the BookKeeper client
+# by prefixing them with "bookkeeper_". These configurations are applied
+# to all bookkeeper clients started by the broker (including the managed ledger bookkeeper clients as well as
+# the BookkeeperPackagesStorage bookkeeper client), except the distributed log bookkeeper client.
+# The dlog bookkeeper client is configured in the functions worker configuration file.
# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
# Default is 60 seconds
@@ -1349,4 +1355,8 @@ packagesReplicas=1
# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledgers
+# When using BookKeeperPackagesStorageProvider, you can configure the
+# bookkeeper client by prefixing configurations with "bookkeeper_".
+# This config applies to managed ledger bookkeeper clients, as well.
+
### --- Packages management service configuration variables (end) --- ###
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0b228d26a58..a0449cbb236 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -362,3 +362,23 @@ validateConnectorConfig: false
# Whether to initialize distributed log metadata by runtime.
# If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false
+###########################
+# Arbitrary Configuration
+###########################
+# When a configuration parameter is not explicitly named in the WorkerConfig class, it is only accessible from the
+# properties map. This map can be configured by supplying values to the properties map in this config file.
+
+# Configure the DLog bookkeeper client by prefixing configurations with "bookkeeper_". Because these are arbitrary, they
+# must be added to the properties map to get correctly applied. This configuration applies to the Dlog bookkeeper client
+# in both the standalone function workers and function workers initialized in the broker.
+
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
+## For example, when using the token authentication provider (AuthenticationProviderToken), you must configure several
+## custom configurations. Here is a sample for configuring one of the necessary configs:
+#properties:
+# tokenPublicKey: "file:///path/to/my/key"
+# tokenPublicAlg: "RSA256"
+
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 77129ccc71d..2454b9bf20c 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -163,6 +163,10 @@ tlsEnabledWithBroker=false
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec=300
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
##### --- Rate Limiting --- #####
# Max concurrent inbound connections. The proxy will reject requests beyond that.
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 535fade4ea6..4fe6f7e37b6 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -92,6 +92,10 @@ brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 9a09deb166b..ce91ecf907c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -29,7 +29,6 @@ import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
@@ -42,6 +41,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
@@ -147,15 +147,11 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
bkConf.setGetBookieInfoRetryIntervalSeconds(
conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
- Properties allProps = conf.getProperties();
- allProps.forEach((key, value) -> {
- String sKey = key.toString();
- if (sKey.startsWith("bookkeeper_") && value != null) {
- String bkExtraConfigKey = sKey.substring(11);
- log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value);
- bkConf.setProperty(bkExtraConfigKey, value);
- }
- });
+ PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
+ .forEach((key, value) -> {
+ log.info("Applying BookKeeper client configuration setting {}={}", key, value);
+ bkConf.setProperty(key, value);
+ });
return bkConf;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a777ced4f95..85f21e0bf8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -127,6 +127,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -1328,7 +1330,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
- ClientConfigurationData conf = new ClientConfigurationData();
+ ClientConfigurationData initialConf = new ClientConfigurationData();
+ initialConf.setStatsIntervalSeconds(0);
+
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ Map<String, Object> overrides = PropertiesUtils
+ .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
+ ClientConfigurationData conf =
+ ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
@@ -1356,8 +1367,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters()));
}
-
- conf.setStatsIntervalSeconds(0);
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
} catch (Exception e) {
throw new PulsarServerException(e);
@@ -1377,10 +1386,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
+ ", webServiceAddressTls: " + webServiceAddressTls
+ ", webServiceAddress: " + webServiceAddress);
}
- PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
- .authentication(//
- conf.getBrokerClientAuthenticationPlugin(), //
- conf.getBrokerClientAuthenticationParameters());
+ PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
+ builder.authentication(
+ conf.getBrokerClientAuthenticationPlugin(),
+ conf.getBrokerClientAuthenticationParameters());
if (conf.isBrokerClientTlsEnabled()) {
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58c39ac9143..4d910cb901d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1272,6 +1273,11 @@ public class NamespaceService implements AutoCloseable {
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5dc5edf9f08..a932e2fa505 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.common.configuration.FieldContext;
@@ -1135,6 +1136,12 @@ public class BrokerService implements Closeable {
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(),
+ "brokerClient_"));
+
if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
} else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
@@ -1210,10 +1217,16 @@ public class BrokerService implements Closeable {
boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls());
String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl();
- PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl)
- .authentication(
- conf.getBrokerClientAuthenticationPlugin(),
- conf.getBrokerClientAuthenticationParameters());
+ PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"));
+
+ builder.authentication(
+ conf.getBrokerClientAuthenticationPlugin(),
+ conf.getBrokerClientAuthenticationParameters());
if (isTlsUrl) {
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index ac028ef871b..35ca089e5df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -105,6 +106,11 @@ public class CompactorTool {
ClientBuilder clientBuilder = PulsarClient.builder();
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(), "brokerClient_"));
+
if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
brokerConfig.getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..775636c9489
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPulsarServiceAdminClientConfiguration() throws PulsarServerException {
+ Properties config = pulsar.getConfiguration().getProperties();
+ config.setProperty("brokerClient_operationTimeoutMs", "60000");
+ config.setProperty("brokerClient_statsIntervalSeconds", "10");
+ ClientConfigurationData clientConf = ((PulsarAdminImpl) pulsar.getAdminClient()).getClientConfigData();
+ Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+ Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+ }
+
+ @Test
+ public void testPulsarServicePulsarClientConfiguration() throws PulsarServerException {
+ Properties config = pulsar.getConfiguration().getProperties();
+ config.setProperty("brokerClient_operationTimeoutMs", "60000");
+ config.setProperty("brokerClient_statsIntervalSeconds", "10");
+ pulsar.getConfiguration().setBrokerClientAuthenticationParameters("sensitive");
+ ClientConfigurationData clientConf = ((PulsarClientImpl) pulsar.getClient()).getConfiguration();
+ Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+ // Config should override internal default, which is 0.
+ Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+ Assert.assertEquals(clientConf.getAuthParams(), "sensitive");
+ }
+
+ @Test
+ public void testBrokerServicePulsarClientConfiguration() {
+ // This data only needs to have the service url for this test.
+ ClusterData data = ClusterData.builder().serviceUrl("http://localhost:8080").build();
+
+ // Set the configs and set some configs that won't apply
+ Properties config = pulsar.getConfiguration().getProperties();
+ config.setProperty("brokerClient_operationTimeoutMs", "60000");
+ config.setProperty("brokerClient_statsIntervalSeconds", "10");
+ config.setProperty("memoryLimitBytes", "10");
+ config.setProperty("brokerClient_memoryLimitBytes", "100000");
+
+ PulsarClientImpl client = (PulsarClientImpl) pulsar.getBrokerService()
+ .getReplicationClient("an_arbitrary_name", Optional.of(data));
+ ClientConfigurationData clientConf = client.getConfiguration();
+ Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+ // Config should override internal default, which is 0.
+ Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+ // This config defaults to 0 (for good reason), but it could be overridden by configuration.
+ Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
+ }
+
+ @Test
+ public void testNamespaceServicePulsarClientConfiguration() {
+ // This data only needs to have the service url for this test.
+ ClusterDataImpl data = (ClusterDataImpl) ClusterData.builder().serviceUrl("http://localhost:8080").build();
+
+ // Set the configs and set some configs that won't apply
+ Properties config = pulsar.getConfiguration().getProperties();
+ config.setProperty("brokerClient_operationTimeoutMs", "60000");
+ config.setProperty("brokerClient_statsIntervalSeconds", "10");
+ config.setProperty("memoryLimitBytes", "10");
+ config.setProperty("brokerClient_memoryLimitBytes", "100000");
+
+ PulsarClientImpl client = pulsar.getNamespaceService().getNamespaceClient(data);
+ ClientConfigurationData clientConf = client.getConfiguration();
+ Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+ // Config should override internal default, which is 0.
+ Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+ // This config defaults to 0 (for good reason), but it could be overridden by configuration.
+ Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..4f885ecc46b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class PulsarClientConfigurationOverrideTest {
+ @Test
+ public void testFilterAndMapProperties() {
+ // Create a default config
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.getProperties().setProperty("keepAliveIntervalSeconds", "15");
+ conf.getProperties().setProperty("brokerClient_keepAliveIntervalSeconds", "25");
+
+ // Apply the filtering and mapping logic
+ Map<String, Object> result = PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_");
+
+ // Ensure the results match expectations
+ Assert.assertEquals(result.size(), 1, "The filtered map should have one entry.");
+ Assert.assertNull(result.get("brokerClient_keepAliveIntervalSeconds"),
+ "The mapped prop should not be in the result.");
+ Assert.assertEquals(result.get("keepAliveIntervalSeconds"), "25", "The original value is overridden.");
+
+ // Create sample ClientBuilder
+ ClientBuilder builder = PulsarClient.builder();
+ Assert.assertEquals(
+ ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 30);
+ // Note: this test would fail if any @Secret fields were set before the loadConf and the accessed afterwards.
+ builder.loadConf(result);
+ Assert.assertEquals(
+ ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 25);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index ec4937bdd21..184f86340fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -66,6 +66,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
public void configTest(int numIoThreads, int connectionsPerBroker) throws Exception {
config.setWebSocketNumIoThreads(numIoThreads);
config.setWebSocketConnectionsPerBroker(connectionsPerBroker);
+ config.getProperties().setProperty("brokerClient_serviceUrl", "https://broker.com:8080");
+ config.setServiceUrl("http://localhost:8080");
+ config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
service.start();
@@ -73,6 +76,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
assertEquals(client.getConfiguration().getNumIoThreads(), numIoThreads);
assertEquals(client.getConfiguration().getConnectionsPerBroker(), connectionsPerBroker);
+ assertEquals(client.getConfiguration().getServiceUrl(), "http://localhost:8080",
+ "brokerClient_ configs take precedence");
+ assertEquals(client.getConfiguration().getLookupTimeoutMs(), 100);
service.close();
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 9f8b4be1409..c685c1f7793 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -36,6 +36,29 @@ public interface PulsarAdminBuilder {
*/
PulsarAdmin build() throws PulsarClientException;
+ /**
+ * Load the configuration from provided <tt>config</tt> map.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * {@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put("serviceHttpUrl", "http://localhost:6650");
+ *
+ * PulsarAdminBuilder builder = ...;
+ * builder = builder.loadConf(config);
+ *
+ * PulsarAdmin client = builder.build();
+ * }
+ * </pre>
+ *
+ * @param config
+ * configuration to load
+ * @return the client builder instance
+ */
+ PulsarAdminBuilder loadConf(Map<String, Object> config);
+
/**
* Create a copy of the current client builder.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 70463b7fb4e..d86b9e73457 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -28,10 +28,11 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
- protected final ClientConfigurationData conf;
+ protected ClientConfigurationData conf;
private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS;
private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS;
@@ -62,6 +63,12 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
return new PulsarAdminBuilderImpl(conf.clone());
}
+ @Override
+ public PulsarAdminBuilder loadConf(Map<String, Object> config) {
+ conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
+ return this;
+ }
+
@Override
public PulsarAdminBuilder serviceHttpUrl(String serviceHttpUrl) {
conf.setServiceUrl(serviceHttpUrl);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
new file mode 100644
index 00000000000..4a418b1d515
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Internal utility methods for filtering and mapping {@link Properties} objects.
+ */
+public class PropertiesUtils {
+
+ /**
+ * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+ * and then removes that prefix and puts the key value pairs into the result map.
+ * @param props - the properties object to filter
+ * @param prefix - the prefix to filter against and then remove for keys in the resulting map
+ * @return a map of properties
+ */
+ public static Map<String, Object> filterAndMapProperties(Properties props, String prefix) {
+ return filterAndMapProperties(props, prefix, "");
+ }
+
+ /**
+ * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+ * and then replaces the srcPrefix with the targetPrefix when putting the key value pairs in the resulting map.
+ * @param props - the properties object to filter
+ * @param srcPrefix - the prefix to filter against and then remove for keys in the resulting map
+ * @param targetPrefix - the prefix to add to keys in the result map
+ * @return a map of properties
+ */
+ public static Map<String, Object> filterAndMapProperties(Properties props, String srcPrefix, String targetPrefix) {
+ Map<String, Object> result = new HashMap<>();
+ int prefixLength = srcPrefix.length();
+ props.forEach((keyObject, value) -> {
+ if (!(keyObject instanceof String)) {
+ return;
+ }
+ String key = (String) keyObject;
+ if (key.startsWith(srcPrefix) && value != null) {
+ String truncatedKey = key.substring(prefixLength);
+ result.put(targetPrefix + truncatedKey, value);
+ }
+ });
+ return result;
+ }
+}
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml b/pulsar-functions/src/test/resources/test_worker_config.yml
index 4614ca3cfd1..f0ecf2bd71b 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -23,4 +23,7 @@ pulsarServiceUrl: pulsar://localhost:6650
functionMetadataTopicName: test-function-metadata-topic
numFunctionPackageReplicas: 3
maxPendingAsyncRequests: 200
+properties:
+ # Fake Bookkeeper Client config to be applied to the DLog Bookkeeper Client
+ bookkeeper_testKey: "fakeValue"
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index ef92e85853b..9ac03746756 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -133,7 +133,8 @@ public class PulsarWorkerService implements WorkerService {
workerConfig.getBrokerClientAuthenticationParameters(),
workerConfig.getBrokerClientTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection(),
- workerConfig.isTlsEnableHostnameVerification());
+ workerConfig.isTlsEnableHostnameVerification(),
+ workerConfig);
} else {
return WorkerUtils.getPulsarAdminClient(
pulsarServiceUrl,
@@ -141,7 +142,8 @@ public class PulsarWorkerService implements WorkerService {
null,
null,
workerConfig.isTlsAllowInsecureConnection(),
- workerConfig.isTlsEnableHostnameVerification());
+ workerConfig.isTlsEnableHostnameVerification(),
+ workerConfig);
}
}
@@ -156,7 +158,8 @@ public class PulsarWorkerService implements WorkerService {
workerConfig.isUseTls(),
workerConfig.getBrokerClientTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection(),
- workerConfig.isTlsEnableHostnameVerification());
+ workerConfig.isTlsEnableHostnameVerification(),
+ workerConfig);
} else {
return WorkerUtils.getPulsarClient(
pulsarServiceUrl,
@@ -165,7 +168,8 @@ public class PulsarWorkerService implements WorkerService {
null,
null,
workerConfig.isTlsAllowInsecureConnection(),
- workerConfig.isTlsEnableHostnameVerification());
+ workerConfig.isTlsEnableHostnameVerification(),
+ workerConfig);
}
}
};
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 4f3ee042866..741a89bc397 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
@@ -158,6 +159,13 @@ public final class WorkerUtils {
workerConfig.getBookkeeperClientAuthenticationParameters());
}
}
+ // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the
+ // bookie client.
+ PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "bookkeeper_", "bkc.")
+ .forEach((key, value) -> {
+ log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value);
+ conf.setProperty(key, value);
+ });
return conf;
}
@@ -194,12 +202,20 @@ public final class WorkerUtils {
}
public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) {
- return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null);
+ return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null, null);
}
public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
Boolean enableTlsHostnameVerificationEnable) {
+ return getPulsarAdminClient(pulsarWebServiceUrl, authPlugin, authParams, tlsTrustCertsFilePath,
+ allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+ }
+
+ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
+ String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+ Boolean enableTlsHostnameVerificationEnable,
+ WorkerConfig workerConfig) {
log.info("Create Pulsar Admin to service url {}: "
+ "authPlugin = {}, authParams = {}, "
+ "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, enableTlsHostnameVerification = {}",
@@ -207,6 +223,13 @@ public final class WorkerUtils {
tlsTrustCertsFilePath, allowTlsInsecureConnection, enableTlsHostnameVerificationEnable);
try {
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
+ if (workerConfig != null) {
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ adminBuilder.loadConf(
+ PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+ }
if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
adminBuilder.authentication(authPlugin, authParams);
}
@@ -219,6 +242,7 @@ public final class WorkerUtils {
if (enableTlsHostnameVerificationEnable != null) {
adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
}
+
return adminBuilder.build();
} catch (PulsarClientException e) {
log.error("Error creating pulsar admin client", e);
@@ -228,17 +252,33 @@ public final class WorkerUtils {
public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
return getPulsarClient(pulsarServiceUrl, null, null, null,
- null, null, null);
+ null, null, null, null);
}
public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
Boolean useTls, String tlsTrustCertsFilePath,
Boolean allowTlsInsecureConnection,
Boolean enableTlsHostnameVerificationEnable) {
+ return getPulsarClient(pulsarServiceUrl, authPlugin, authParams, useTls, tlsTrustCertsFilePath,
+ allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+ }
+
+ public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
+ Boolean useTls, String tlsTrustCertsFilePath,
+ Boolean allowTlsInsecureConnection,
+ Boolean enableTlsHostnameVerificationEnable,
+ WorkerConfig workerConfig) {
try {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ if (workerConfig != null) {
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ clientBuilder.loadConf(
+ PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+ }
if (isNotBlank(authPlugin)
&& isNotBlank(authParams)) {
clientBuilder.authentication(authPlugin, authParams);
@@ -255,7 +295,6 @@ public final class WorkerUtils {
if (enableTlsHostnameVerificationEnable != null) {
clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
}
-
return clientBuilder.build();
} catch (PulsarClientException e) {
log.error("Error creating pulsar client", e);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
index d899db13237..b2e0f0f354c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -40,8 +40,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import org.apache.distributedlog.DistributedLogConfiguration;
public class WorkerUtilsTest {
@@ -99,4 +104,18 @@ public class WorkerUtilsTest {
}
}
+
+ @Test
+ public void testDLogConfiguration() throws URISyntaxException, IOException {
+ // The config yml is seeded with a fake bookie config.
+ URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+ WorkerConfig config = WorkerConfig.load(yamlUrl.toURI().getPath());
+
+ // Map the config.
+ DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(config);
+
+ // Verify the outcome.
+ assertEquals(dlogConf.getString("bkc.testKey"), "fakeValue",
+ "The bookkeeper client config mapping should apply.");
+ }
}
\ No newline at end of file
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index f0db59351f5..e3147c0e8bc 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -36,6 +36,7 @@ import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
import org.apache.zookeeper.KeeperException;
@@ -72,6 +73,13 @@ public class BookKeeperPackagesStorage implements PackagesStorage {
configuration.getBookkeeperClientAuthenticationParameters());
}
}
+ // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the
+ // bookie client.
+ PropertiesUtils.filterAndMapProperties(configuration.getProperties(), "bookkeeper_", "bkc.")
+ .forEach((key, value) -> {
+ log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value);
+ conf.setProperty(key, value);
+ });
try {
this.namespace = NamespaceBuilder.newBuilder()
.conf(conf).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build();
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
index 226b80abeaa..ce6acecdd51 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -58,6 +58,10 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo
return getProperty("bookkeeperClientAuthenticationParameters");
}
+ @Override
+ public Properties getProperties() {
+ return configuration.getProperties();
+ }
@Override
public String getProperty(String key) {
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
index b4044a6338c..5c346a0d05c 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
@@ -50,4 +50,10 @@ public interface PackagesStorageConfiguration {
* a group of the property
*/
void setProperty(Properties properties);
+
+ /**
+ * Get all properties for the configuration.
+ * @return all properties for the configuration
+ */
+ Properties getProperties();
}
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
index cb35048a360..d3c5d7494b3 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
@@ -39,4 +39,9 @@ public class DefaultPackagesStorageConfiguration implements PackagesStorageConfi
public void setProperty(Properties properties) {
this.properties = properties;
}
+
+ @Override
+ public Properties getProperties() {
+ return this.properties;
+ }
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d9f0f5db38f..0f41208fde2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +48,8 @@ import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -521,9 +524,17 @@ public class ProxyConnection extends PulsarHandler {
}
ClientConfigurationData createClientConfiguration() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(service.getServiceUrl());
+ ClientConfigurationData initialConf = new ClientConfigurationData();
+ initialConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ Map<String, Object> overrides = PropertiesUtils
+ .filterAndMapProperties(proxyConfig.getProperties(), "brokerClient_");
+ ClientConfigurationData clientConf = ConfigurationDataUtils
+ .loadData(overrides, initialConf, ClientConfigurationData.class);
+
clientConf.setAuthentication(this.getClientAuthentication());
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index b2873d778ab..7a0f19bad10 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -181,6 +182,11 @@ public class WebSocketService implements Closeable {
.ioThreads(config.getWebSocketNumIoThreads()) //
.connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
+ // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more information.
+ clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
&& isNotBlank(config.getBrokerClientAuthenticationParameters())) {
clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
@@ -198,7 +204,6 @@ public class WebSocketService implements Closeable {
} else {
clientBuilder.serviceUrl(clusterData.getServiceUrl());
}
-
return clientBuilder.build();
}
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index f8d1e87f890..3226007209f 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -352,6 +352,15 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
| managedLedgerInfoCompressionType | Compression type of managed ledger information. <br><br>Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`). <br><br>If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed. <br><br>**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None |
| additionalServlets | Additional servlet name. <br><br>If you have multiple additional servlets, separate them by commas. <br><br>For example, additionalServlet_1, additionalServlet_2 | N/A |
| additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet |
+#### Configuration Override For Clients Internal to Broker
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients and Pulsar Admin Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
+|bookkeeper_| Configure the broker's bookkeeper clients used by managed ledgers and the BookkeeperPackagesStorage bookkeeper client. Takes precedence over most other configuration values.|
+
+Note: when running the function worker within the broker, these prefixed configurations do not apply to any of those clients. You must instead configure those clients using the `functions_worker.yml`.
## Client
@@ -677,6 +686,12 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|tlsCertificateFilePath|||
|tlsKeyFilePath |||
|tlsTrustCertsFilePath|||
+#### Configuration Override For Clients Internal to WebSocket
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
## Pulsar proxy
@@ -734,6 +749,12 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
|haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false|
| numIOThreads | Number of threads used for Netty IO. | 2 * Runtime.getRuntime().availableProcessors() |
| numAcceptorThreads | Number of threads used for Netty Acceptor. | 1 |
+#### Configuration Override For Clients Internal to Proxy
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the proxy's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
## ZooKeeper