You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:17 UTC

[pulsar] 10/10: Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e5d035ae413cbe67e0efa8afb7d62d979cc58659
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Jun 1 02:14:49 2022 -0500

    Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)
    
    (cherry picked from commit aa673498f88d0ed4f9d5788a5036355834ea5119)
---
 conf/broker.conf                                   |  14 ++-
 conf/functions_worker.yml                          |  20 ++++
 conf/proxy.conf                                    |   4 +
 conf/websocket.conf                                |   4 +
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  16 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |  32 ++++---
 .../pulsar/broker/namespace/NamespaceService.java  |  10 +-
 .../pulsar/broker/service/BrokerService.java       |  21 ++++-
 .../apache/pulsar/compaction/CompactorTool.java    |   6 ++
 ...kerInternalClientConfigurationOverrideTest.java | 103 +++++++++++++++++++++
 .../PulsarClientConfigurationOverrideTest.java     |  56 +++++++++++
 .../websocket/proxy/ProxyConfigurationTest.java    |   8 +-
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  23 +++++
 .../admin/internal/PulsarAdminBuilderImpl.java     |   9 +-
 .../pulsar/client/internal/PropertiesUtils.java    |  64 +++++++++++++
 .../src/test/resources/test_worker_config.yml      |   3 +
 .../pulsar/functions/worker/WorkerService.java     |   7 +-
 .../pulsar/functions/worker/WorkerUtils.java       |  45 ++++++++-
 .../pulsar/functions/worker/WorkerUtilsTest.java   |  44 +++++++++
 .../pulsar/proxy/server/ProxyConnection.java       |  16 +++-
 .../apache/pulsar/websocket/WebSocketService.java  |   7 +-
 site2/docs/reference-configuration.md              |  22 +++++
 22 files changed, 489 insertions(+), 45 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 9534ed73c63..0c554ada913 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -570,6 +570,9 @@ brokerClientTlsCiphers=
 # used by the internal client to authenticate with Pulsar brokers
 brokerClientTlsProtocols=
 
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
 
 ### --- Authentication --- ###
 
@@ -798,8 +801,11 @@ managedLedgerDefaultWriteQuorum=2
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=2
 
-# you can add other configuration options for the BookKeeper client
-# by prefixing them with bookkeeper_
+# You can add other configuration options for the BookKeeper client
+# by prefixing them with "bookkeeper_". These configurations are applied
+# to all bookkeeper clients started by the broker (including the managed ledger bookkeeper clients as well as
+# the BookkeeperPackagesStorage bookkeeper client), except the distributed log bookkeeper client.
+# The dlog bookkeeper client is configured in the functions worker configuration file.
 
 # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
 # Default is 60 seconds
@@ -1175,6 +1181,10 @@ fileSystemURI=
 
 ### --- Deprecated config variables --- ###
 
+# When using BookKeeperPackagesStorageProvider, you can configure the
+# bookkeeper client by prefixing configurations with "bookkeeper_".
+# This config applies to managed ledger bookkeeper clients, as well.
+
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 4dfcf586307..2376edbd2a3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -243,3 +243,23 @@ functionsDirectory: ./functions
 
 # Should connector config be validated during during submission
 validateConnectorConfig: false
+###########################
+# Arbitrary Configuration
+###########################
+# When a configuration parameter is not explicitly named in the WorkerConfig class, it is only accessible from the
+# properties map. This map can be configured by supplying values to the properties map in this config file.
+
+# Configure the DLog bookkeeper client by prefixing configurations with "bookkeeper_". Because these are arbitrary, they
+# must be added to the properties map to get correctly applied. This configuration applies to the Dlog bookkeeper client
+# in both the standalone function workers and function workers initialized in the broker.
+
+# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
+## For example, when using the token authentication provider (AuthenticationProviderToken), you must configure several
+## custom configurations. Here is a sample for configuring one of the necessary configs:
+#properties:
+#    tokenPublicKey: "file:///path/to/my/key"
+#    tokenPublicAlg: "RSA256"
+
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5ac664ef48b..f0fa60483af 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -126,6 +126,10 @@ tlsEnabledWithBroker=false
 # Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec=300
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 ##### --- Rate Limiting --- #####
 
 # Max concurrent inbound connections. The proxy will reject requests beyond that.
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 8e4a59be945..85005bab46c 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -92,6 +92,10 @@ brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 brokerClientTrustCertsFilePath=
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 # When this parameter is not empty, unauthenticated users perform as anonymousUserRole
 anonymousUserRole=
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index f00c9050d5f..e2bf0d91729 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -27,7 +27,6 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.annotations.VisibleForTesting;
@@ -41,6 +40,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
@@ -141,15 +141,11 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
                 conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
         bkConf.setGetBookieInfoRetryIntervalSeconds(
                 conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
-        Properties allProps = conf.getProperties();
-        allProps.forEach((key, value) -> {
-            String sKey = key.toString();
-            if (sKey.startsWith("bookkeeper_") && value != null) {
-                String bkExtraConfigKey = sKey.substring(11);
-                log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value);
-                bkConf.setProperty(bkExtraConfigKey, value);
-            }
-        });
+        PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
+                .forEach((key, value) -> {
+                    log.info("Applying BookKeeper client configuration setting {}={}", key, value);
+                    bkConf.setProperty(key, value);
+                });
         return bkConf;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3cfdaf73366..4492c1d1e0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,21 +22,17 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
-
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,7 +51,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
@@ -68,12 +63,13 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.mledger.offload.OffloadersCache;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -108,6 +104,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -141,9 +138,8 @@ import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
-import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -1060,6 +1056,12 @@ public class PulsarService implements AutoCloseable {
                     .enableTls(this.getConfiguration().isTlsEnabled())
                     .allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection())
                     .tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                Map<String, Object> overrides = PropertiesUtils
+                        .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
+                builder.loadConf(overrides);
 
                 if (this.getConfiguration().isBrokerClientTlsEnabled()) {
                     if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
@@ -1098,10 +1100,16 @@ public class PulsarService implements AutoCloseable {
                             + ", webServiceAddressTls: " + webServiceAddressTls
                             + ", webServiceAddress: " + webServiceAddress);
                 }
-                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
-                        .authentication( //
-                                conf.getBrokerClientAuthenticationPlugin(), //
-                                conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (conf.isBrokerClientTlsEnabled()) {
                     if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 52cce387d55..72f233836c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -29,13 +29,10 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
 import static org.apache.pulsar.common.util.Codec.decode;
-
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-
 import io.netty.channel.EventLoopGroup;
 import io.prometheus.client.Counter;
-
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
@@ -53,7 +50,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -77,6 +73,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1256,6 +1253,11 @@ public class NamespaceService {
                     .enableTcpNoDelay(false)
                     .statsInterval(0, TimeUnit.SECONDS);
 
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                         pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 47ecd6e7cbf..29d1001e4bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.events.EventsTopicNames;
@@ -943,6 +944,12 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                         .enableTcpNoDelay(false)
                         .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
                         .statsInterval(0, TimeUnit.SECONDS);
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(),
+                    "brokerClient_"));
+
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
@@ -996,10 +1003,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
                 boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls());
                 String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl();
-                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl)
-                        .authentication(
-                                conf.getBrokerClientAuthenticationPlugin(),
-                                conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (isTlsUrl) {
                     builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 4e3b76e57de..88609157cd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
@@ -80,6 +81,11 @@ public class CompactorTool {
 
         ClientBuilder clientBuilder = PulsarClient.builder();
 
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(), "brokerClient_"));
+
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
             clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
                     brokerConfig.getBrokerClientAuthenticationParameters());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..407e9ea05fd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Properties;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPulsarServiceAdminClientConfiguration() throws PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        ClientConfigurationData clientConf = pulsar.getAdminClient().getClientConfigData();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+    @Test
+    public void testPulsarServicePulsarClientConfiguration() throws PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        pulsar.getConfiguration().setBrokerClientAuthenticationParameters("sensitive");
+        ClientConfigurationData clientConf = ((PulsarClientImpl) pulsar.getClient()).getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+        Assert.assertEquals(clientConf.getAuthParams(), "sensitive");
+    }
+
+    @Test
+    public void testBrokerServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterData data = new ClusterData("http://localhost:8080");
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+
+        PulsarClientImpl client = (PulsarClientImpl) pulsar.getBrokerService()
+                .getReplicationClient("test");
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+    @Test
+    public void testNamespaceServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterData data = new ClusterData("http://localhost:8080");
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+
+        PulsarClientImpl client = pulsar.getNamespaceService().getNamespaceClient(data);
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..4f885ecc46b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class PulsarClientConfigurationOverrideTest {
+    @Test
+    public void testFilterAndMapProperties() {
+        // Create a default config
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.getProperties().setProperty("keepAliveIntervalSeconds", "15");
+        conf.getProperties().setProperty("brokerClient_keepAliveIntervalSeconds", "25");
+
+        // Apply the filtering and mapping logic
+        Map<String, Object> result = PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_");
+
+        // Ensure the results match expectations
+        Assert.assertEquals(result.size(), 1, "The filtered map should have one entry.");
+        Assert.assertNull(result.get("brokerClient_keepAliveIntervalSeconds"),
+                "The mapped prop should not be in the result.");
+        Assert.assertEquals(result.get("keepAliveIntervalSeconds"), "25", "The original value is overridden.");
+
+        // Create sample ClientBuilder
+        ClientBuilder builder = PulsarClient.builder();
+        Assert.assertEquals(
+                ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 30);
+        // Note: this test would fail if any @Secret fields were set before the loadConf and the accessed afterwards.
+        builder.loadConf(result);
+        Assert.assertEquals(
+                ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 25);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index afe91a77e3e..af2bd957c8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -21,9 +21,7 @@ package org.apache.pulsar.websocket.proxy;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-
 import java.util.Optional;
-
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.websocket.WebSocketService;
@@ -61,6 +59,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
     public void configTest(int numIoThreads, int connectionsPerBroker) throws Exception {
         config.setWebSocketNumIoThreads(numIoThreads);
         config.setWebSocketConnectionsPerBroker(connectionsPerBroker);
+        config.getProperties().setProperty("brokerClient_serviceUrl", "https://broker.com:8080");
+        config.setServiceUrl("http://localhost:8080");
+        config.getProperties().setProperty("brokerClient_requestTimeoutMs", "100");
         WebSocketService service = spy(new WebSocketService(config));
         doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
         service.start();
@@ -68,6 +69,9 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
         assertEquals(client.getConfiguration().getNumIoThreads(), numIoThreads);
         assertEquals(client.getConfiguration().getConnectionsPerBroker(), connectionsPerBroker);
+        assertEquals(client.getConfiguration().getServiceUrl(), "http://localhost:8080",
+                "brokerClient_ configs take precedence");
+        assertEquals(client.getConfiguration().getRequestTimeoutMs(), 100);
 
         service.close();
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 9de2d39a7ca..c1c9b800db2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -37,6 +37,29 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdmin build() throws PulsarClientException;
 
+    /**
+     * Load the configuration from provided <tt>config</tt> map.
+     *
+     * <p>Example:
+     *
+     * <pre>
+     * {@code
+     * Map<String, Object> config = new HashMap<>();
+     * config.put("serviceHttpUrl", "http://localhost:6650");
+     *
+     * PulsarAdminBuilder builder = ...;
+     * builder = builder.loadConf(config);
+     *
+     * PulsarAdmin client = builder.build();
+     * }
+     * </pre>
+     *
+     * @param config
+     *            configuration to load
+     * @return the client builder instance
+     */
+    PulsarAdminBuilder loadConf(Map<String, Object> config);
+
     /**
      * Create a copy of the current client builder.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 146faaf29d7..7e552adfa49 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -29,10 +29,11 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
 
-    protected final ClientConfigurationData conf;
+    protected ClientConfigurationData conf;
     private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS;
     private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS;
     private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS;
@@ -61,6 +62,12 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
         return new PulsarAdminBuilderImpl(conf.clone());
     }
 
+    @Override
+    public PulsarAdminBuilder loadConf(Map<String, Object> config) {
+        conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder serviceHttpUrl(String serviceHttpUrl) {
         conf.setServiceUrl(serviceHttpUrl);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
new file mode 100644
index 00000000000..4a418b1d515
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Internal utility methods for filtering and mapping {@link Properties} objects.
+ */
+public class PropertiesUtils {
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+     * and then removes that prefix and puts the key value pairs into the result map.
+     * @param props - the properties object to filter
+     * @param prefix - the prefix to filter against and then remove for keys in the resulting map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, String prefix) {
+        return filterAndMapProperties(props, prefix, "");
+    }
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the configured prefix are retained,
+     * and then replaces the srcPrefix with the targetPrefix when putting the key value pairs in the resulting map.
+     * @param props - the properties object to filter
+     * @param srcPrefix - the prefix to filter against and then remove for keys in the resulting map
+     * @param targetPrefix - the prefix to add to keys in the result map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, String srcPrefix, String targetPrefix) {
+        Map<String, Object> result = new HashMap<>();
+        int prefixLength = srcPrefix.length();
+        props.forEach((keyObject, value) -> {
+            if (!(keyObject instanceof String)) {
+                return;
+            }
+            String key = (String) keyObject;
+            if (key.startsWith(srcPrefix) && value != null) {
+                String truncatedKey = key.substring(prefixLength);
+                result.put(targetPrefix + truncatedKey, value);
+            }
+        });
+        return result;
+    }
+}
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml b/pulsar-functions/src/test/resources/test_worker_config.yml
index f2645f61e8d..36c762af45c 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -22,4 +22,7 @@ workerPort: 7654
 pulsarServiceUrl: pulsar://localhost:6650
 functionMetadataTopicName: test-function-metadata-topic
 numFunctionPackageReplicas: 3
+properties:
+  # Fake Bookkeeper Client config to be applied to the DLog Bookkeeper Client
+  bookkeeper_testKey: "fakeValue"
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index d0218b45790..b660ff64823 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -148,18 +148,19 @@ public class WorkerService {
                 this.brokerAdmin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                     workerConfig.getBrokerClientAuthenticationPlugin(), workerConfig.getBrokerClientAuthenticationParameters(),
                     pulsarClientTlsTrustCertsFilePath, workerConfig.isTlsAllowInsecureConnection(),
-                    workerConfig.isTlsEnableHostnameVerification());
+                    workerConfig.isTlsEnableHostnameVerification(), workerConfig);
 
                 this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
                     workerConfig.getBrokerClientAuthenticationPlugin(), workerConfig.getBrokerClientAuthenticationParameters(),
                     workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(),
-                    workerConfig.isTlsEnableHostnameVerification());
+                    workerConfig.isTlsEnableHostnameVerification(), workerConfig);
 
                 this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl(),
                         workerConfig.getBrokerClientAuthenticationPlugin(),
                         workerConfig.getBrokerClientAuthenticationParameters(),
                         workerConfig.isUseTls(), pulsarClientTlsTrustCertsFilePath,
-                        workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsEnableHostnameVerification());
+                        workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsEnableHostnameVerification(),
+                        workerConfig);
             } else {
                 this.brokerAdmin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 81659ad8095..05eb809337f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -152,6 +153,13 @@ public final class WorkerUtils {
                         workerConfig.getBookkeeperClientAuthenticationParameters());
             }
         }
+        // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the
+        // bookie client.
+        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "bookkeeper_", "bkc.")
+                .forEach((key, value) -> {
+                    log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value);
+                    conf.setProperty(key, value);
+                });
         return conf;
     }
 
@@ -184,12 +192,20 @@ public final class WorkerUtils {
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) {
-        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null);
+        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null, null);
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
                                                    String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
                                                    Boolean enableTlsHostnameVerificationEnable) {
+        return getPulsarAdminClient(pulsarWebServiceUrl, authPlugin, authParams, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
+                                                   String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+                                                   Boolean enableTlsHostnameVerificationEnable,
+                                                   WorkerConfig workerConfig) {
         log.info("Create Pulsar Admin to service url {}: "
                 + "authPlugin = {}, authParams = {}, "
                 + "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, enableTlsHostnameVerification = {}",
@@ -197,6 +213,13 @@ public final class WorkerUtils {
             tlsTrustCertsFilePath, allowTlsInsecureConnection, enableTlsHostnameVerificationEnable);
         try {
             PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                adminBuilder.loadConf(
+                        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+            }
             if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
                 adminBuilder.authentication(authPlugin, authParams);
             }
@@ -209,6 +232,7 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
+
             return adminBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar admin client", e);
@@ -218,17 +242,33 @@ public final class WorkerUtils {
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
         return getPulsarClient(pulsarServiceUrl, null, null, null,
-                null, null, null);
+                null, null, null, null);
     }
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
                                                Boolean useTls, String tlsTrustCertsFilePath,
                                                Boolean allowTlsInsecureConnection,
                                                Boolean enableTlsHostnameVerificationEnable) {
+        return getPulsarClient(pulsarServiceUrl, authPlugin, authParams, useTls, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
+                                               Boolean useTls, String tlsTrustCertsFilePath,
+                                               Boolean allowTlsInsecureConnection,
+                                               Boolean enableTlsHostnameVerificationEnable,
+                                               WorkerConfig workerConfig) {
 
         try {
             ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
 
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more information.
+                clientBuilder.loadConf(
+                        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_"));
+            }
             if (isNotBlank(authPlugin)
                     && isNotBlank(authParams)) {
                 clientBuilder.authentication(authPlugin, authParams);
@@ -245,7 +285,6 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
-
             return clientBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar client", e);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
new file mode 100644
index 00000000000..042b505472d
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.testng.annotations.Test;
+
+public class WorkerUtilsTest {
+
+
+    @Test
+    public void testDLogConfiguration() throws URISyntaxException, IOException {
+        // The config yml is seeded with a fake bookie config.
+        URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        WorkerConfig config = WorkerConfig.load(yamlUrl.toURI().getPath());
+
+        // Map the config.
+        DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(config);
+
+        // Verify the outcome.
+        assertEquals(dlogConf.getString("bkc.testKey"), "fakeValue",
+                "The bookkeeper client config mapping should apply.");
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index bdfb11a9f0e..3c73284e7ed 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +48,8 @@ import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -524,10 +527,17 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     ClientConfigurationData createClientConfiguration()
-            throws PulsarClientException.UnsupportedAuthenticationException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(service.getServiceUrl());
+        throws PulsarClientException.UnsupportedAuthenticationException {
+        ClientConfigurationData initialConf = new ClientConfigurationData();
+        initialConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        Map<String, Object> overrides = PropertiesUtils
+                .filterAndMapProperties(proxyConfig.getProperties(), "brokerClient_");
+        ClientConfigurationData clientConf = ConfigurationDataUtils
+                .loadData(overrides, initialConf, ClientConfigurationData.class);
         if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
             clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
                     proxyConfig.getBrokerClientAuthenticationParameters()));
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 7b7ba26f44e..c36ad98740a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -186,6 +187,11 @@ public class WebSocketService implements Closeable {
                 .ioThreads(config.getWebSocketNumIoThreads()) //
                 .connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
 
+        // Apply all arbitrary configuration. This must be called before setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more information.
+        clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
+
         if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
                 && isNotBlank(config.getBrokerClientAuthenticationParameters())) {
             clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
@@ -203,7 +209,6 @@ public class WebSocketService implements Closeable {
         } else {
             clientBuilder.serviceUrl(clusterData.getServiceUrl());
         }
-
         return clientBuilder.build();
     }
 
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index d03cb2eae9d..7c55b744e03 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -345,6 +345,15 @@ subscriptionExpirationTimeMinutes | How long to delete inactive subscriptions fr
 | preciseTopicPublishRateLimiterEnable | Enable precise topic publish rate limiting. | false |
 | lazyCursorRecovery | Whether to recover cursors lazily when trying to recover a managed ledger backing a persistent topic. It can improve write availability of topics. The caveat is now when recovered ledger is ready to write we're not sure if all old consumers' last mark delete position(ack position) can be recovered or not. So user can make the trade off or have custom logic in application to checkpoint consumer state.| false |  
 
+#### Configuration Override For Clients Internal to Broker
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients and Pulsar Admin Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
+|bookkeeper_| Configure the broker's bookkeeper clients used by managed ledgers and the BookkeeperPackagesStorage bookkeeper client. Takes precedence over most other configuration values.|
+
+Note: when running the function worker within the broker, these prefixed configurations do not apply to any of those clients. You must instead configure those clients using the `functions_worker.yml`.
 
 
 ## Client
@@ -682,6 +691,12 @@ The value of 0 disables message-byte dispatch-throttling.|0|
 |tlsKeyFilePath |||
 |tlsTrustCertsFilePath|||
 
+#### Configuration Override For Clients Internal to WebSocket
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
 
 ## Pulsar proxy
 
@@ -746,6 +761,13 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
 | tokenAudience | The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token need contains this parameter.| |
 | proxyLogLevel | Set the Pulsar Proxy log level. <li> If the value is set to 0, no TCP channel information is logged. <li> If the value is set to 1, only the TCP channel information and command information (without message body) are parsed and logged. <li> If the value is set to 2, all TCP channel information, command information, and message body are parsed and logged. | 0 |
 
+#### Configuration Override For Clients Internal to Proxy
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the proxy's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.|
+
 ## ZooKeeper
 
 ZooKeeper handles a broad range of essential configuration- and coordination-related tasks for Pulsar. The default configuration file for ZooKeeper is in the `conf/zookeeper.conf` file in your Pulsar installation. The following parameters are available: