You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 07:56:17 UTC

[pulsar] 02/02: Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50d95028a21e9284299b60704873a8daaa42c410
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Jun 1 02:14:49 2022 -0500

    Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)
    
    (cherry picked from commit aa673498f88d0ed4f9d5788a5036355834ea5119)
---
 conf/broker.conf                                   |  14 ++-
 conf/functions_worker.yml                          |  20 ++++
 conf/proxy.conf                                    |   4 +
 conf/websocket.conf                                |   4 +
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  16 ++-
 .../org/apache/pulsar/broker/PulsarService.java    |  29 ++++--
 .../pulsar/broker/namespace/NamespaceService.java  |   6 ++
 .../pulsar/broker/service/BrokerService.java       |  21 +++-
 .../apache/pulsar/compaction/CompactorTool.java    |   6 ++
 ...kerInternalClientConfigurationOverrideTest.java | 115 +++++++++++++++++++++
 .../PulsarClientConfigurationOverrideTest.java     |  56 ++++++++++
 .../websocket/proxy/ProxyConfigurationTest.java    |   6 ++
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  23 +++++
 .../admin/internal/PulsarAdminBuilderImpl.java     |   9 +-
 .../pulsar/client/internal/PropertiesUtils.java    |  64 ++++++++++++
 .../src/test/resources/test_worker_config.yml      |   3 +
 .../functions/worker/PulsarWorkerService.java      |  12 ++-
 .../pulsar/functions/worker/WorkerUtils.java       |  45 +++++++-
 .../pulsar/functions/worker/WorkerUtilsTest.java   |  19 ++++
 .../bookkeeper/BookKeeperPackagesStorage.java      |   8 ++
 .../BookKeeperPackagesStorageConfiguration.java    |   4 +
 .../core/PackagesStorageConfiguration.java         |   6 ++
 .../impl/DefaultPackagesStorageConfiguration.java  |   5 +
 .../pulsar/proxy/server/ProxyConnection.java       |  15 ++-
 .../apache/pulsar/websocket/WebSocketService.java  |   7 +-
 site2/docs/reference-configuration.md              |  21 ++++
 26 files changed, 504 insertions(+), 34 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 3d11ce2538a..db8b618fff8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -656,6 +656,9 @@ brokerClientTlsCiphers=
 # used by the internal client to authenticate with Pulsar brokers
 brokerClientTlsProtocols=
 
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
 
 ### --- Authentication --- ###
 
@@ -897,8 +900,11 @@ managedLedgerDefaultAckQuorum=2
 # in case of lack of enough bookies
 #bookkeeper_opportunisticStriping=false
 
-# you can add other configuration options for the BookKeeper client
-# by prefixing them with bookkeeper_
+# You can add other configuration options for the BookKeeper client
+# by prefixing them with "bookkeeper_". These configurations are applied
+# to all bookkeeper clients started by the broker (including the managed ledger bookkeeper clients as well as
+# the BookkeeperPackagesStorage bookkeeper client), except the distributed log bookkeeper client.
+# The dlog bookkeeper client is configured in the functions worker configuration file.
 
 # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
 # Default is 60 seconds
@@ -1349,4 +1355,8 @@ packagesReplicas=1
 # The bookkeeper ledger root path
 packagesManagementLedgerRootPath=/ledgers
 
+# When using BookKeeperPackagesStorageProvider, you can configure the
+# bookkeeper client by prefixing configurations with "bookkeeper_".
+# This config applies to managed ledger bookkeeper clients, as well.
+
 ### --- Packages management service configuration variables (end) --- ###
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0b228d26a58..a0449cbb236 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -362,3 +362,23 @@ validateConnectorConfig: false
 # Whether to initialize distributed log metadata by runtime.
 # If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
 initializedDlogMetadata: false
+###########################
+# Arbitrary Configuration
+###########################
+# When a configuration parameter is not explicitly named in the WorkerConfig class, it is only accessible from the
+# properties map. This map can be configured by supplying values to the properties map in this config file.
+
+# Configure the DLog bookkeeper client by prefixing configurations with "bookkeeper_". Because these are arbitrary, they
+# must be added to the properties map to get correctly applied. This configuration applies to the Dlog bookkeeper client
+# in both the standalone function workers and function workers initialized in the broker.
+
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
+## For example, when using the token authentication provider (AuthenticationProviderToken), you must configure several
+## custom configurations. Here is a sample for configuring one of the necessary configs:
+#properties:
+#    tokenPublicKey: "file:///path/to/my/key"
+#    tokenPublicAlg: "RSA256"
+
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 77129ccc71d..2454b9bf20c 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -163,6 +163,10 @@ tlsEnabledWithBroker=false
 # Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec=300
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 ##### --- Rate Limiting --- #####
 
 # Max concurrent inbound connections. The proxy will reject requests beyond that.
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 535fade4ea6..4fe6f7e37b6 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -92,6 +92,10 @@ brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 brokerClientTrustCertsFilePath=
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 # When this parameter is not empty, unauthenticated users perform as anonymousUserRole
 anonymousUserRole=
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 9a09deb166b..ce91ecf907c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -29,7 +29,6 @@ import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.extern.slf4j.Slf4j;
@@ -42,6 +41,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
@@ -147,15 +147,11 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
                 conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
         bkConf.setGetBookieInfoRetryIntervalSeconds(
                 conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
-        Properties allProps = conf.getProperties();
-        allProps.forEach((key, value) -> {
-            String sKey = key.toString();
-            if (sKey.startsWith("bookkeeper_") && value != null) {
-                String bkExtraConfigKey = sKey.substring(11);
-                log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value);
-                bkConf.setProperty(bkExtraConfigKey, value);
-            }
-        });
+        PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
+                .forEach((key, value) -> {
+                    log.info("Applying BookKeeper client configuration setting {}={}", key, value);
+                    bkConf.setProperty(key, value);
+                });
         return bkConf;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a777ced4f95..85f21e0bf8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -127,6 +127,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -1328,7 +1330,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     public synchronized PulsarClient getClient() throws PulsarServerException {
         if (this.client == null) {
             try {
-                ClientConfigurationData conf = new ClientConfigurationData();
+                ClientConfigurationData initialConf = new ClientConfigurationData();
+                initialConf.setStatsIntervalSeconds(0);
+
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                Map<String, Object> overrides = PropertiesUtils
+                        .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
+                ClientConfigurationData conf =
+                        ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
                 conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
                                 ? this.brokerServiceUrlTls : this.brokerServiceUrl);
                 conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
@@ -1356,8 +1367,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                             this.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             this.getConfiguration().getBrokerClientAuthenticationParameters()));
                 }
-
-                conf.setStatsIntervalSeconds(0);
                 this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
@@ -1377,10 +1386,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                             + ", webServiceAddressTls: " + webServiceAddressTls
                             + ", webServiceAddress: " + webServiceAddress);
                 }
-                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
-                        .authentication(//
-                                conf.getBrokerClientAuthenticationPlugin(), //
-                                conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (conf.isBrokerClientTlsEnabled()) {
                     builder.tlsCiphers(config.getBrokerClientTlsCiphers())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58c39ac9143..4d910cb901d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1272,6 +1273,11 @@ public class NamespaceService implements AutoCloseable {
                     .enableTcpNoDelay(false)
                     .statsInterval(0, TimeUnit.SECONDS);
 
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                         pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5dc5edf9f08..a932e2fa505 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.BindAddress;
 import org.apache.pulsar.common.configuration.FieldContext;
@@ -1135,6 +1136,12 @@ public class BrokerService implements Closeable {
                         .enableTcpNoDelay(false)
                         .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
                         .statsInterval(0, TimeUnit.SECONDS);
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(),
+                        "brokerClient_"));
+
                 if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
                     clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
                 } else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
@@ -1210,10 +1217,16 @@ public class BrokerService implements Closeable {
 
                 boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls());
                 String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl();
-                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl)
-                        .authentication(
-                                conf.getBrokerClientAuthenticationPlugin(),
-                                conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (isTlsUrl) {
                     builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index ac028ef871b..35ca089e5df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -105,6 +106,11 @@ public class CompactorTool {
 
         ClientBuilder clientBuilder = PulsarClient.builder();
 
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(), "brokerClient_"));
+
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
             clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
                     brokerConfig.getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..775636c9489
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPulsarServiceAdminClientConfiguration() throws PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        ClientConfigurationData clientConf = ((PulsarAdminImpl) pulsar.getAdminClient()).getClientConfigData();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+    @Test
+    public void testPulsarServicePulsarClientConfiguration() throws PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        pulsar.getConfiguration().setBrokerClientAuthenticationParameters("sensitive");
+        ClientConfigurationData clientConf = ((PulsarClientImpl) pulsar.getClient()).getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+        Assert.assertEquals(clientConf.getAuthParams(), "sensitive");
+    }
+
+    @Test
+    public void testBrokerServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterData data = ClusterData.builder().serviceUrl("http://localhost:8080").build();
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        config.setProperty("memoryLimitBytes", "10");
+        config.setProperty("brokerClient_memoryLimitBytes", "100000");
+
+        PulsarClientImpl client = (PulsarClientImpl) pulsar.getBrokerService()
+                .getReplicationClient("an_arbitrary_name", Optional.of(data));
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+        // This config defaults to 0 (for good reason), but it could be overridden by configuration.
+        Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
+    }
+
+    @Test
+    public void testNamespaceServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterDataImpl data = (ClusterDataImpl) ClusterData.builder().serviceUrl("http://localhost:8080").build();
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        config.setProperty("memoryLimitBytes", "10");
+        config.setProperty("brokerClient_memoryLimitBytes", "100000");
+
+        PulsarClientImpl client = pulsar.getNamespaceService().getNamespaceClient(data);
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+        // This config defaults to 0 (for good reason), but it could be overridden by configuration.
+        Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..4f885ecc46b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class PulsarClientConfigurationOverrideTest {
+    @Test
+    public void testFilterAndMapProperties() {
+        // Create a default config
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.getProperties().setProperty("keepAliveIntervalSeconds", "15");
+        conf.getProperties().setProperty("brokerClient_keepAliveIntervalSeconds", "25");
+
+        // Apply the filtering and mapping logic
+        Map<String, Object> result = PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_");
+
+        // Ensure the results match expectations
+        Assert.assertEquals(result.size(), 1, "The filtered map should have one entry.");
+        Assert.assertNull(result.get("brokerClient_keepAliveIntervalSeconds"),
+                "The mapped prop should not be in the result.");
+        Assert.assertEquals(result.get("keepAliveIntervalSeconds"), "25", "The original value is overridden.");
+
+        // Create sample ClientBuilder
+        ClientBuilder builder = PulsarClient.builder();
+        Assert.assertEquals(
+                ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 30);
+        // Note: this test would fail if any @Secret fields were set before the loadConf and the accessed afterwards.
+        builder.loadConf(result);
+        Assert.assertEquals(
+                ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 25);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index ec4937bdd21..184f86340fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -66,6 +66,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
     public void configTest(int numIoThreads, int connectionsPerBroker) throws Exception {
         config.setWebSocketNumIoThreads(numIoThreads);
         config.setWebSocketConnectionsPerBroker(connectionsPerBroker);
+        config.getProperties().setProperty("brokerClient_serviceUrl", "https://broker.com:8080");
+        config.setServiceUrl("http://localhost:8080");
+        config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
         WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
         doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
         service.start();
@@ -73,6 +76,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
         assertEquals(client.getConfiguration().getNumIoThreads(), numIoThreads);
         assertEquals(client.getConfiguration().getConnectionsPerBroker(), connectionsPerBroker);
+        assertEquals(client.getConfiguration().getServiceUrl(), "http://localhost:8080",
+                "brokerClient_ configs take precedence");
+        assertEquals(client.getConfiguration().getLookupTimeoutMs(), 100);
 
         service.close();
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 9f8b4be1409..c685c1f7793 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -36,6 +36,29 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdmin build() throws PulsarClientException;
 
+    /**
+     * Load the configuration from provided <tt>config</tt> map.
+     *
+     * <p>Example:
+     *
+     * <pre>
+     * {@code
+     * Map<String, Object> config = new HashMap<>();
+     * config.put("serviceHttpUrl", "http://localhost:6650");
+     *
+     * PulsarAdminBuilder builder = ...;
+     * builder = builder.loadConf(config);
+     *
+     * PulsarAdmin client = builder.build();
+     * }
+     * </pre>
+     *
+     * @param config
+     *            configuration to load
+     * @return the client builder instance
+     */
+    PulsarAdminBuilder loadConf(Map<String, Object> config);
+
     /**
      * Create a copy of the current client builder.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 70463b7fb4e..d86b9e73457 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -28,10 +28,11 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
 
-    protected final ClientConfigurationData conf;
+    protected ClientConfigurationData conf;
     private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS;
     private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS;
     private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS;
@@ -62,6 +63,12 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
         return new PulsarAdminBuilderImpl(conf.clone());
     }
 
+    @Override
+    public PulsarAdminBuilder loadConf(Map<String, Object> config) {
+        conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder serviceHttpUrl(String serviceHttpUrl) {
         conf.setServiceUrl(serviceHttpUrl);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
new file mode 100644
index 00000000000..4a418b1d515
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Internal utility methods for filtering and mapping {@link Properties} objects.
+ */
+public class PropertiesUtils {
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+     * and then removes that prefix and puts the key value pairs into the result map.
+     * @param props - the properties object to filter
+     * @param prefix - the prefix to filter against and then remove for keys in the resulting map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, String prefix) {
+        return filterAndMapProperties(props, prefix, "");
+    }
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+     * and then replaces the srcPrefix with the targetPrefix when putting the key value pairs in the resulting map.
+     * @param props - the properties object to filter
+     * @param srcPrefix - the prefix to filter against and then remove for keys in the resulting map
+     * @param targetPrefix - the prefix to add to keys in the result map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, String srcPrefix, String targetPrefix) {
+        Map<String, Object> result = new HashMap<>();
+        int prefixLength = srcPrefix.length();
+        props.forEach((keyObject, value) -> {
+            if (!(keyObject instanceof String)) {
+                return;
+            }
+            String key = (String) keyObject;
+            if (key.startsWith(srcPrefix) && value != null) {
+                String truncatedKey = key.substring(prefixLength);
+                result.put(targetPrefix + truncatedKey, value);
+            }
+        });
+        return result;
+    }
+}
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml b/pulsar-functions/src/test/resources/test_worker_config.yml
index 4614ca3cfd1..f0ecf2bd71b 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -23,4 +23,7 @@ pulsarServiceUrl: pulsar://localhost:6650
 functionMetadataTopicName: test-function-metadata-topic
 numFunctionPackageReplicas: 3
 maxPendingAsyncRequests: 200
+properties:
+  # Fake Bookkeeper Client config to be applied to the DLog Bookkeeper Client
+  bookkeeper_testKey: "fakeValue"
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index ef92e85853b..9ac03746756 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -133,7 +133,8 @@ public class PulsarWorkerService implements WorkerService {
                         workerConfig.getBrokerClientAuthenticationParameters(),
                         workerConfig.getBrokerClientTrustCertsFilePath(),
                         workerConfig.isTlsAllowInsecureConnection(),
-                        workerConfig.isTlsEnableHostnameVerification());
+                        workerConfig.isTlsEnableHostnameVerification(),
+                        workerConfig);
                 } else {
                     return WorkerUtils.getPulsarAdminClient(
                             pulsarServiceUrl,
@@ -141,7 +142,8 @@ public class PulsarWorkerService implements WorkerService {
                             null,
                             null,
                             workerConfig.isTlsAllowInsecureConnection(),
-                            workerConfig.isTlsEnableHostnameVerification());
+                            workerConfig.isTlsEnableHostnameVerification(),
+                            workerConfig);
                 }
             }
 
@@ -156,7 +158,8 @@ public class PulsarWorkerService implements WorkerService {
                         workerConfig.isUseTls(),
                         workerConfig.getBrokerClientTrustCertsFilePath(),
                         workerConfig.isTlsAllowInsecureConnection(),
-                        workerConfig.isTlsEnableHostnameVerification());
+                        workerConfig.isTlsEnableHostnameVerification(),
+                        workerConfig);
                 } else {
                     return WorkerUtils.getPulsarClient(
                             pulsarServiceUrl,
@@ -165,7 +168,8 @@ public class PulsarWorkerService implements WorkerService {
                             null,
                             null,
                             workerConfig.isTlsAllowInsecureConnection(),
-                            workerConfig.isTlsEnableHostnameVerification());
+                            workerConfig.isTlsEnableHostnameVerification(),
+                            workerConfig);
                 }
             }
         };
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 4f3ee042866..741a89bc397 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
@@ -158,6 +159,13 @@ public final class WorkerUtils {
                         workerConfig.getBookkeeperClientAuthenticationParameters());
             }
         }
+        // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the
+        // bookie client.
+        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "bookkeeper_", "bkc.")
+                .forEach((key, value) -> {
+                    log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value);
+                    conf.setProperty(key, value);
+                });
         return conf;
     }
 
@@ -194,12 +202,20 @@ public final class WorkerUtils {
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) {
-        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null);
+        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null, null);
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
                                                    String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
                                                    Boolean enableTlsHostnameVerificationEnable) {
+        return getPulsarAdminClient(pulsarWebServiceUrl, authPlugin, authParams, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
+                                                   String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+                                                   Boolean enableTlsHostnameVerificationEnable,
+                                                   WorkerConfig workerConfig) {
         log.info("Create Pulsar Admin to service url {}: "
             + "authPlugin = {}, authParams = {}, "
             + "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, enableTlsHostnameVerification = {}",
@@ -207,6 +223,13 @@ public final class WorkerUtils {
             tlsTrustCertsFilePath, allowTlsInsecureConnection, enableTlsHostnameVerificationEnable);
         try {
             PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                adminBuilder.loadConf(
+                        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+            }
             if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
                 adminBuilder.authentication(authPlugin, authParams);
             }
@@ -219,6 +242,7 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
+
             return adminBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar admin client", e);
@@ -228,17 +252,33 @@ public final class WorkerUtils {
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
         return getPulsarClient(pulsarServiceUrl, null, null, null,
-                null, null, null);
+                null, null, null, null);
     }
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
                                                Boolean useTls, String tlsTrustCertsFilePath,
                                                Boolean allowTlsInsecureConnection,
                                                Boolean enableTlsHostnameVerificationEnable) {
+        return getPulsarClient(pulsarServiceUrl, authPlugin, authParams, useTls, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
+                                               Boolean useTls, String tlsTrustCertsFilePath,
+                                               Boolean allowTlsInsecureConnection,
+                                               Boolean enableTlsHostnameVerificationEnable,
+                                               WorkerConfig workerConfig) {
 
         try {
             ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
 
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(
+                        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+            }
             if (isNotBlank(authPlugin)
                     && isNotBlank(authParams)) {
                 clientBuilder.authentication(authPlugin, authParams);
@@ -255,7 +295,6 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
-
             return clientBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar client", e);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
index d899db13237..b2e0f0f354c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -40,8 +40,13 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import org.apache.distributedlog.DistributedLogConfiguration;
 
 public class WorkerUtilsTest {
 
@@ -99,4 +104,18 @@ public class WorkerUtilsTest {
 
         }
     }
+
+    @Test
+    public void testDLogConfiguration() throws URISyntaxException, IOException {
+        // The config yml is seeded with a fake bookie config.
+        URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        WorkerConfig config = WorkerConfig.load(yamlUrl.toURI().getPath());
+
+        // Map the config.
+        DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(config);
+
+        // Verify the outcome.
+        assertEquals(dlogConf.getString("bkc.testKey"), "fakeValue",
+                "The bookkeeper client config mapping should apply.");
+    }
 }
\ No newline at end of file
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index f0db59351f5..e3147c0e8bc 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -36,6 +36,7 @@ import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.packages.management.core.PackagesStorage;
 import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
 import org.apache.zookeeper.KeeperException;
@@ -72,6 +73,13 @@ public class BookKeeperPackagesStorage implements PackagesStorage {
                     configuration.getBookkeeperClientAuthenticationParameters());
             }
         }
+        // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the
+        // bookie client.
+        PropertiesUtils.filterAndMapProperties(configuration.getProperties(), "bookkeeper_", "bkc.")
+                .forEach((key, value) -> {
+                    log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value);
+                    conf.setProperty(key, value);
+                });
         try {
             this.namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build();
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
index 226b80abeaa..ce6acecdd51 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -58,6 +58,10 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo
         return getProperty("bookkeeperClientAuthenticationParameters");
     }
 
+    @Override
+    public Properties getProperties() {
+        return configuration.getProperties();
+    }
 
     @Override
     public String getProperty(String key) {
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
index b4044a6338c..5c346a0d05c 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
@@ -50,4 +50,10 @@ public interface PackagesStorageConfiguration {
      *          a group of the property
      */
     void setProperty(Properties properties);
+
+    /**
+     * Get all properties for the configuration.
+     * @return all properties for the configuration
+     */
+    Properties getProperties();
 }
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
index cb35048a360..d3c5d7494b3 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
@@ -39,4 +39,9 @@ public class DefaultPackagesStorageConfiguration implements PackagesStorageConfi
     public void setProperty(Properties properties) {
         this.properties = properties;
     }
+
+    @Override
+    public Properties getProperties() {
+        return this.properties;
+    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d9f0f5db38f..0f41208fde2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +48,8 @@ import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -521,9 +524,17 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     ClientConfigurationData createClientConfiguration() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(service.getServiceUrl());
+        ClientConfigurationData initialConf = new ClientConfigurationData();
+        initialConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        Map<String, Object> overrides = PropertiesUtils
+                .filterAndMapProperties(proxyConfig.getProperties(), "brokerClient_");
+        ClientConfigurationData clientConf = ConfigurationDataUtils
+                .loadData(overrides, initialConf, ClientConfigurationData.class);
+
         clientConf.setAuthentication(this.getClientAuthentication());
         if (proxyConfig.isTlsEnabledWithBroker()) {
             clientConf.setUseTls(true);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index b2873d778ab..7a0f19bad10 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -181,6 +182,11 @@ public class WebSocketService implements Closeable {
                 .ioThreads(config.getWebSocketNumIoThreads()) //
                 .connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
 
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
         if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
                 && isNotBlank(config.getBrokerClientAuthenticationParameters())) {
             clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
@@ -198,7 +204,6 @@ public class WebSocketService implements Closeable {
         } else {
             clientBuilder.serviceUrl(clusterData.getServiceUrl());
         }
-
         return clientBuilder.build();
     }
 
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index f8d1e87f890..3226007209f 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -352,6 +352,15 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
 | managedLedgerInfoCompressionType | Compression type of managed ledger information. <br><br>Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`). <br><br>If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed. <br><br>**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None |
 | additionalServlets | Additional servlet name. <br><br>If you have multiple additional servlets, separate them by commas. <br><br>For example, additionalServlet_1, additionalServlet_2 | N/A |
 | additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet |
+#### Configuration Override For Clients Internal to Broker
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients and Pulsar Admin Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
+|bookkeeper_| Configure the broker's bookkeeper clients used by managed ledgers and the BookkeeperPackagesStorage bookkeeper client. Takes precedence over most other configuration values.|
+
+Note: when running the function worker within the broker, these prefixed configurations do not apply to any of those clients. You must instead configure those clients using the `functions_worker.yml`.
 
 ## Client
 
@@ -677,6 +686,12 @@ You can set the log level and configuration in the  [log4j2.yaml](https://github
 |tlsCertificateFilePath|||
 |tlsKeyFilePath |||
 |tlsTrustCertsFilePath|||
+#### Configuration Override For Clients Internal to WebSocket
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
 
 ## Pulsar proxy
 
@@ -734,6 +749,12 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
 |haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false|
 | numIOThreads | Number of threads used for Netty IO. | 2 * Runtime.getRuntime().availableProcessors() |
 | numAcceptorThreads | Number of threads used for Netty Acceptor. | 1 |
+#### Configuration Override For Clients Internal to Proxy
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the proxy's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
 
 ## ZooKeeper