You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/25 11:31:03 UTC

[pulsar] branch branch-2.10 updated: [fix][client] Set authentication when using loadConf in client and admin client (#18358)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9160ee6ad9f [fix][client] Set authentication when using loadConf in client and admin client (#18358)
9160ee6ad9f is described below

commit 9160ee6ad9f24bfeb40c9d6b34b8d2959feda137
Author: Chris Bono <cb...@vmware.com>
AuthorDate: Sat Feb 4 14:03:23 2023 -0600

    [fix][client] Set authentication when using loadConf in client and admin client (#18358)
    
    (cherry picked from commit 0f72a822dd8aa21fa3d86bb6f62ea4482fc3b907)
---
 .../admin/internal/PulsarAdminBuilderImpl.java     |  20 +++
 .../client/admin/internal/PulsarAdminImpl.java     |  17 +-
 .../admin/internal/PulsarAdminBuilderImplTest.java | 185 ++++++++++++++++++++-
 .../client/admin/internal/PulsarAdminImplTest.java |  55 ++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |  22 ++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  16 --
 .../client/impl/conf/ClientConfigurationData.java  |  11 +-
 .../pulsar/client/impl/ClientBuilderImplTest.java  | 170 +++++++++++++++++++
 .../client/impl/auth/AuthenticationTokenTest.java  |   5 +-
 .../impl/conf/ClientConfigurationDataTest.java     |  22 +--
 10 files changed, 464 insertions(+), 59 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index d86b9e73457..e4aa66a4216 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.Authentication;
@@ -66,6 +67,7 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
     @Override
     public PulsarAdminBuilder loadConf(Map<String, Object> config) {
         conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
+        setAuthenticationFromPropsIfAvailable(conf);
         return this;
     }
 
@@ -95,6 +97,24 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
         return this;
     }
 
+    private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData clientConfig) {
+        String authPluginClass = clientConfig.getAuthPluginClassName();
+        String authParams = clientConfig.getAuthParams();
+        Map<String, String> authParamMap = clientConfig.getAuthParamMap();
+        if (StringUtils.isBlank(authPluginClass) || (StringUtils.isBlank(authParams) && authParamMap == null)) {
+            return;
+        }
+        try {
+            if (StringUtils.isNotBlank(authParams)) {
+                authentication(authPluginClass, authParams);
+            } else if (authParamMap != null) {
+                authentication(authPluginClass, authParamMap);
+            }
+        } catch (UnsupportedAuthenticationException ex) {
+            throw new RuntimeException("Failed to create authentication: " + ex.getMessage(), ex);
+        }
+    }
+
     @Override
     public PulsarAdminBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
         conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index 427ab6d1aff..a9c54c25a39 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -159,12 +159,9 @@ public class PulsarAdminImpl implements PulsarAdmin {
         this.requestTimeoutUnit = requestTimeoutUnit;
         this.clientConfigData = clientConfigData;
         this.auth = clientConfigData != null ? clientConfigData.getAuthentication() : new AuthenticationDisabled();
-        LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl,
-                auth != null ? auth.getAuthMethodName() : null);
+        LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl, auth.getAuthMethodName());
 
-        if (auth != null) {
-            auth.start();
-        }
+        this.auth.start();
 
         if (clientConfigData != null && StringUtils.isBlank(clientConfigData.getServiceUrl())) {
             clientConfigData.setServiceUrl(serviceUrl);
@@ -240,7 +237,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
      * This client object can be used to perform many subsquent API calls
      *
      * @param serviceUrl
-     *            the Pulsar service URL (eg. "http://my-broker.example.com:8080")
+     *            the Pulsar service URL (eg. 'http://my-broker.example.com:8080')
      * @param auth
      *            the Authentication object to be used to talk with Pulsar
      * @deprecated Since 2.0. Use {@link #builder()} to construct a new {@link PulsarAdmin} instance.
@@ -262,7 +259,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
      * This client object can be used to perform many subsquent API calls
      *
      * @param serviceUrl
-     *            the Pulsar URL (eg. "http://my-broker.example.com:8080")
+     *            the Pulsar URL (eg. 'http://my-broker.example.com:8080')
      * @param authPluginClassName
      *            name of the Authentication-Plugin you want to use
      * @param authParamsString
@@ -281,7 +278,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
      * This client object can be used to perform many subsquent API calls
      *
      * @param serviceUrl
-     *            the Pulsar URL (eg. "http://my-broker.example.com:8080")
+     *            the Pulsar URL (eg. 'http://my-broker.example.com:8080')
      * @param authPluginClassName
      *            name of the Authentication-Plugin you want to use
      * @param authParams
@@ -479,9 +476,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
     @Override
     public void close() {
         try {
-            if (auth != null) {
-                auth.close();
-            }
+            auth.close();
         } catch (IOException e) {
             LOG.error("Failed to close the authentication service", e);
         }
diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index 1ea45401eec..d278a187690 100644
--- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -18,21 +18,190 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.SneakyThrows;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+/**
+ * Unit tests for {@link PulsarAdminBuilder}.
+ */
 public class PulsarAdminBuilderImplTest {
 
+    private static final String MOCK_AUTH_SECRET_PLUGIN_CLASS = MockAuthenticationSecret.class.getName();
+
+    private static final String AUTH_PLUGIN_CLASS_PROP = "authPluginClassName";
+
+    private static final String AUTH_PARAMS_PROP = "authParams";
+
+    private static final String AUTH_PARAM_MAP_PROP = "authParamMap";
+
+    @Test
+    public void testBuildFailsWhenServiceUrlNotSet() {
+        assertThatIllegalArgumentException().isThrownBy(() -> PulsarAdmin.builder().build())
+                        .withMessageContaining("Service URL needs to be specified");
+    }
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamsProp() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamMapProp() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass1"));
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamsPropWhenBothPropsAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass2"));
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    private void assertAuthWithSecret(Authentication authentication, String secret) {
+        assertThat(authentication).isInstanceOfSatisfying(MockAuthenticationSecret.class,
+                (auth) -> assertThat(auth.getSecret()).isEqualTo(secret));
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNoPropsAvailable() {
+        Authentication auth = createAdminAndGetAuth(Collections.emptyMap());
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenEmptyAuthParamsSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, "");
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNullAuthParamsSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, null);
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNullParamMapSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAM_MAP_PROP, null);
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
     @Test
-    public void testAdminBuilderWithServiceUrlNotSet() throws PulsarClientException {
-        try{
-            PulsarAdmin.builder().build();
-            fail();
-        } catch (IllegalArgumentException exception) {
-            assertEquals("Service URL needs to be specified", exception.getMessage());
+    public void testLoadConfAuthNotSetWhenOnlyPluginClassNameAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenOnlyAuthParamsAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenOnlyAuthParamMapAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass2"));
+        Authentication auth = createAdminAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    private void assertThatAuthIsNotSet(Authentication authentication) {
+        // getAuthentication() returns disabled when null
+        assertThat(authentication).isInstanceOf(AuthenticationDisabled.class);
+    }
+
+    @SneakyThrows
+    private Authentication createAdminAndGetAuth(Map<String, Object> confProps) {
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").loadConf(confProps).build()) {
+            return ((PulsarAdminImpl)admin).auth;
+        }
+    }
+
+    private String secretAuthParams(String secret) {
+        return String.format("{\"secret\":\"%s\"}", secret);
+    }
+
+    private Map<String, String> secretAuthParamMap(String secret) {
+        return Collections.singletonMap("secret", secret);
+    }
+
+    static public class MockAuthenticationSecret implements Authentication, EncodedAuthenticationParameterSupport {
+
+        private String secret;
+
+        @Override
+        public String getAuthMethodName() {
+            return "mock-secret";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+            return null;
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            configure(new Gson().toJson(authParams));
+        }
+
+        @Override
+        public void configure(String encodedAuthParamString) {
+            JsonObject params = new Gson().fromJson(encodedAuthParamString, JsonObject.class);
+            secret = params.get("secret").getAsString();
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        public String getSecret() {
+            return secret;
         }
     }
 }
diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java
new file mode 100644
index 00000000000..5cb69f367ec
--- /dev/null
+++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link PulsarAdminImpl}.
+ */
+public class PulsarAdminImplTest {
+
+    @Test
+    public void testAuthDisabledWhenAuthNotSpecifiedAnywhere() {
+        assertThat(createAdminAndGetAuth(new ClientConfigurationData()))
+                .isInstanceOf(AuthenticationDisabled.class);
+    }
+
+    @Test
+    public void testAuthFromConfUsedWhenConfHasAuth() {
+        Authentication auth = mock(Authentication.class);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setAuthentication(auth);
+        assertThat(createAdminAndGetAuth(conf)).isSameAs(auth);
+    }
+
+    @SneakyThrows
+    private Authentication createAdminAndGetAuth(ClientConfigurationData conf) {
+        try (PulsarAdminImpl admin = new PulsarAdminImpl("http://localhost:8080", conf)) {
+            return admin.auth;
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index bb7c3cb539d..79369dc4c67 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -78,8 +78,8 @@ public class ClientBuilderImpl implements ClientBuilder {
 
     @Override
     public ClientBuilder loadConf(Map<String, Object> config) {
-        conf = ConfigurationDataUtils.loadData(
-            config, conf, ClientConfigurationData.class);
+        conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
+        setAuthenticationFromPropsIfAvailable(conf);
         return this;
     }
 
@@ -139,6 +139,24 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData clientConfig) {
+        String authPluginClass = clientConfig.getAuthPluginClassName();
+        String authParams = clientConfig.getAuthParams();
+        Map<String, String> authParamMap = clientConfig.getAuthParamMap();
+        if (StringUtils.isBlank(authPluginClass) || (StringUtils.isBlank(authParams) && authParamMap == null)) {
+            return;
+        }
+        try {
+            if (StringUtils.isNotBlank(authParams)) {
+                authentication(authPluginClass, authParams);
+            } else if (authParamMap != null) {
+                authentication(authPluginClass, authParamMap);
+            }
+        } catch (UnsupportedAuthenticationException ex) {
+            throw new RuntimeException("Failed to create authentication: " + ex.getMessage(), ex);
+        }
+    }
+
     @Override
     public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
         checkArgument(operationTimeout >= 0, "operationTimeout needs to be >= 0");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 294e73cc673..a47fdefb182 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -48,9 +48,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Builder;
 import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
@@ -185,7 +183,6 @@ public class PulsarClientImpl implements PulsarClient {
             if (conf == null || isBlank(conf.getServiceUrl())) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
@@ -231,19 +228,6 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
-    private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
-        if (StringUtils.isBlank(conf.getAuthPluginClassName())
-                || (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
-            return;
-        }
-
-        if (StringUtils.isNotBlank(conf.getAuthParams())) {
-            conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
-        } else if (conf.getAuthParamMap() != null) {
-            conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
-        }
-    }
-
     public ClientConfigurationData getConfiguration() {
         return conf;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 6d99a9fa986..f468dd43f45 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -330,16 +330,19 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     @Secret
     private String socks5ProxyPassword;
 
+    /**
+     * Gets the authentication settings for the client.
+     *
+     * @return authentication settings for the client or {@link AuthenticationDisabled} when auth has not been specified
+     */
     public Authentication getAuthentication() {
-        if (authentication == null) {
-            this.authentication = AuthenticationDisabled.INSTANCE;
-        }
-        return authentication;
+        return this.authentication != null ? this.authentication : AuthenticationDisabled.INSTANCE;
     }
 
     public void setAuthentication(Authentication authentication) {
         this.authentication = authentication;
     }
+
     public boolean isUseTls() {
         if (useTls) {
             return true;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
index 8b129dfac3c..dfd7e7cf5ba 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
@@ -18,13 +18,33 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 public class ClientBuilderImplTest {
 
+    private static final String MOCK_AUTH_SECRET_PLUGIN_CLASS = MockAuthenticationSecret.class.getName();
+
+    private static final String AUTH_PLUGIN_CLASS_PROP = "authPluginClassName";
+
+    private static final String AUTH_PARAMS_PROP = "authParams";
+
+    private static final String AUTH_PARAM_MAP_PROP = "authParamMap";
+
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testClientBuilderWithServiceUrlAndServiceUrlProviderNotSet() throws PulsarClientException {
         PulsarClient.builder().build();
@@ -81,4 +101,154 @@ public class ClientBuilderImplTest {
     }
 
 
+    // Tests for loadConf and authentication
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamsProp() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamMapProp() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass1"));
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    @Test
+    public void testLoadConfSetsAuthUsingAuthParamsPropWhenBothPropsAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass2"));
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertAuthWithSecret(auth, "pass1");
+    }
+
+    private void assertAuthWithSecret(Authentication authentication, String secret) {
+        assertThat(authentication).isInstanceOfSatisfying(MockAuthenticationSecret.class,
+                (auth) -> assertThat(auth.getSecret()).isEqualTo(secret));
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNoPropsAvailable() {
+        Authentication auth = createClientAndGetAuth(Collections.emptyMap());
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenEmptyAuthParamsSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, "");
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNullAuthParamsSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAMS_PROP, null);
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenNullParamMapSpecified() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        confProps.put(AUTH_PARAM_MAP_PROP, null);
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenOnlyPluginClassNameAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PLUGIN_CLASS_PROP, MOCK_AUTH_SECRET_PLUGIN_CLASS);
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenOnlyAuthParamsAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PARAMS_PROP, secretAuthParams("pass1"));
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    @Test
+    public void testLoadConfAuthNotSetWhenOnlyAuthParamMapAvailable() {
+        Map<String, Object> confProps = new HashMap<>();
+        confProps.put(AUTH_PARAM_MAP_PROP, secretAuthParamMap("pass2"));
+        Authentication auth = createClientAndGetAuth(confProps);
+        assertThatAuthIsNotSet(auth);
+    }
+
+    private void assertThatAuthIsNotSet(Authentication authentication) {
+        // getAuthentication() returns disabled when null
+        assertThat(authentication).isInstanceOf(AuthenticationDisabled.class);
+    }
+
+    @SneakyThrows
+    private Authentication createClientAndGetAuth(Map<String, Object> confProps) {
+        try (PulsarClient client = PulsarClient.builder().serviceUrl("http://localhost:8080").loadConf(confProps).build()) {
+            return ((PulsarClientImpl)client).conf.getAuthentication();
+        }
+    }
+
+    private String secretAuthParams(String secret) {
+        return String.format("{\"secret\":\"%s\"}", secret);
+    }
+
+    private Map<String, String> secretAuthParamMap(String secret) {
+        return Collections.singletonMap("secret", secret);
+    }
+
+    static public class MockAuthenticationSecret implements Authentication, EncodedAuthenticationParameterSupport {
+
+        private String secret;
+
+        @Override
+        public String getAuthMethodName() {
+            return "mock-secret";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+            return null;
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            configure(new Gson().toJson(authParams));
+        }
+
+        @Override
+        public void configure(String encodedAuthParamString) {
+            JsonObject params = new Gson().fromJson(encodedAuthParamString, JsonObject.class);
+            secret = params.get("secret").getAsString();
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        public String getSecret() {
+            return secret;
+        }
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
index d5d42c9686b..704b5955170 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
@@ -31,6 +31,7 @@ import java.util.function.Supplier;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.testng.annotations.Test;
@@ -61,8 +62,8 @@ public class AuthenticationTokenTest {
     public void testAuthTokenClientConfig() throws Exception {
         ClientConfigurationData clientConfig = new ClientConfigurationData();
         clientConfig.setServiceUrl("pulsar://service-url");
-        clientConfig.setAuthPluginClassName(AuthenticationToken.class.getName());
-        clientConfig.setAuthParams("token-xyz");
+        clientConfig.setAuthentication(AuthenticationFactory.create(
+                AuthenticationToken.class.getName(), "token-xyz"));
 
         PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
index c817ec996d4..e63f0ab0d90 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
@@ -19,38 +19,28 @@
 
 package org.apache.pulsar.client.impl.conf;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
- * Unit test {@link ClientConfigurationData}.
+ * Unit tests for {@link ClientConfigurationData}.
  */
 public class ClientConfigurationDataTest {
 
-    private final ObjectWriter w;
-
-    {
-        ObjectMapper m = new ObjectMapper();
-        m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
-        w = m.writer();
-    }
-
-
     @Test
     public void testDoNotPrintSensitiveInfo() throws JsonProcessingException {
         ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
         clientConfigurationData.setTlsTrustStorePassword("xxxx");
         clientConfigurationData.setSocks5ProxyPassword("yyyy");
         clientConfigurationData.setAuthentication(new AuthenticationToken("zzzz"));
-        String s = w.writeValueAsString(clientConfigurationData);
-        Assert.assertFalse(s.contains("xxxx"));
-        Assert.assertFalse(s.contains("yyyy"));
-        Assert.assertFalse(s.contains("zzzz"));
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        String serializedConf = objectMapper.writeValueAsString(clientConfigurationData);
+        assertThat(serializedConf).doesNotContain("xxxx", "yyyy", "zzzz");
     }
 
 }