You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/06 15:47:06 UTC

[pulsar] branch master updated: [Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17d2b42  [Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)
17d2b42 is described below

commit 17d2b4213ed401bc6dad14f4f76ef8ce270b0461
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Thu Jun 6 21:17:00 2019 +0530

    [Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)
    
    * Flink client to accept all pulsar client conf
    
    In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
    1. accept all params of client, producer and consumer
    2. Keep pace with pulsar-client
    
    * Flink client to accept all pulsar client conf
    
    Added test cases
    
    * Removing commented code
    
    * flink: construct auth when building pulsarsource
    
    * fixed failing tests
    
    * removed unused import
    
    * Added builder defaults for lombok builder
    Set Auth from class and params (if set) in PulsarClientImpl.java
    
    * Remove @BUilder.default from attributes where no defaults exist
    
    * Added tests for ClientConfiguration Data builders
    
    * cosmetic changes in code
    
    Co-Authored-By: Sijie Guo <gu...@gmail.com>
    
    * fixing typo
    
    * Removed test, not true anymore
    
    * Removed lombok builders
    
    * fixed the failing tests
    
    * Because the authentication field is transient, it is not serialized. On desirialization then its null and desirialization crashes with NPE
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 11 +++
 .../client/impl/conf/ClientConfigurationData.java  | 15 ++--
 .../impl/conf/ConsumerConfigurationData.java       |  1 -
 .../impl/conf/ProducerConfigurationData.java       |  1 -
 .../client/impl/auth/AuthenticationTokenTest.java  | 30 ++++++++
 .../impl/conf/ConfigurationDataUtilsTest.java      | 18 +++++
 .../connectors/pulsar/PulsarSourceBuilder.java     |  2 +-
 .../pulsar/PulsarAvroOutputFormatTest.java         | 50 ++++++-------
 .../pulsar/PulsarCsvOutputFormatTest.java          | 50 ++++++-------
 .../pulsar/PulsarJsonOutputFormatTest.java         | 50 ++++++-------
 .../connectors/pulsar/PulsarOutputFormatTest.java  | 61 +++++++---------
 .../connectors/pulsar/PulsarAvroTableSinkTest.java | 11 ++-
 .../connectors/pulsar/PulsarJsonTableSinkTest.java | 10 ++-
 .../connectors/pulsar/PulsarSourceBuilderTest.java | 84 ++++++++++------------
 14 files changed, 204 insertions(+), 190 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 57d1956..0e58796 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
@@ -57,6 +58,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -127,6 +129,7 @@ public class PulsarClientImpl implements PulsarClient {
             throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
         }
         this.eventLoopGroup = eventLoopGroup;
+        setAuth(conf);
         this.conf = conf;
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
@@ -142,6 +145,14 @@ public class PulsarClientImpl implements PulsarClient {
         state.set(State.Open);
     }
 
+    private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
+        if (StringUtils.isBlank(conf.getAuthPluginClassName()) || StringUtils.isBlank( conf.getAuthParams())) {
+            return;
+        }
+
+        conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
+    }
+
     public ClientConfigurationData getConfiguration() {
         return conf;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 6195027..3ab8638 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -28,14 +28,12 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.io.Serializable;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
  * This is a simple holder of the client configuration values.
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class ClientConfigurationData implements Serializable, Cloneable {
@@ -47,10 +45,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
 
     @JsonIgnore
     private transient Authentication authentication = new AuthenticationDisabled();
-    @JsonIgnore
-    private transient String authPluginClassName;
-    @JsonIgnore
-    private transient Map<String, String> authParams;
+    private String authPluginClassName;
+    private String authParams;
 
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
@@ -74,6 +70,13 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
     private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
 
+    public Authentication getAuthentication() {
+        if (authentication == null) {
+            this.authentication = new AuthenticationDisabled();
+        }
+        return authentication;
+    }
+
     public boolean isUseTls() {
         if (useTls)
             return true;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 3352ebb..8aca7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 32f5e61..7ec8f90 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -42,7 +42,6 @@ import com.google.common.collect.Sets;
 import lombok.Data;
 
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class ProducerConfigurationData implements Serializable, Cloneable {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
index 6063a02..e3a94ee 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
@@ -29,7 +29,10 @@ import java.io.File;
 import java.util.Collections;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.testng.annotations.Test;
 
 public class AuthenticationTokenTest {
@@ -55,6 +58,33 @@ public class AuthenticationTokenTest {
     }
 
     @Test
+    public void testAuthTokenClientConfig() throws Exception {
+        ClientConfigurationData clientConfig = new ClientConfigurationData();
+        clientConfig.setServiceUrl("pulsar://service-url");
+        clientConfig.setAuthPluginClassName(AuthenticationToken.class.getName());
+        clientConfig.setAuthParams("token-xyz");
+
+        PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
+
+        Authentication authToken = pulsarClient.getConfiguration().getAuthentication();
+        assertEquals(authToken.getAuthMethodName(), "token");
+
+        AuthenticationDataProvider authData = authToken.getAuthData();
+        assertTrue(authData.hasDataFromCommand());
+        assertEquals(authData.getCommandData(), "token-xyz");
+
+        assertFalse(authData.hasDataForTls());
+        assertNull(authData.getTlsCertificates());
+        assertNull(authData.getTlsPrivateKey());
+
+        assertTrue(authData.hasDataForHttp());
+        assertEquals(authData.getHttpHeaders(),
+                Collections.singletonMap("Authorization", "Bearer token-xyz").entrySet());
+
+        authToken.close();
+    }
+
+    @Test
     public void testAuthTokenConfig() throws Exception {
         AuthenticationToken authToken = new AuthenticationToken();
         authToken.configure("token:my-test-token-string");
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 83ed4e8..626c501 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -25,6 +25,9 @@ import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.annotations.Test;
 
 /**
@@ -108,4 +111,19 @@ public class ConfigurationDataUtilsTest {
             assertTrue(re.getCause() instanceof IOException);
         }
     }
+
+    @Test
+    public void testConfigBuilder() throws PulsarClientException {
+        ClientConfigurationData clientConfig = new ClientConfigurationData();
+        clientConfig.setServiceUrl("pulsar://unknown:6650");
+        clientConfig.setStatsIntervalSeconds(80);
+
+        PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
+        assertTrue(pulsarClient != null, "Pulsar client built using config should not be null");
+
+        assertTrue(pulsarClient.getConfiguration().getServiceUrl().equals("pulsar://unknown:6650"));
+        assertEquals(pulsarClient.getConfiguration().getNumListenerThreads(), 1, "builder default not set properly");
+        assertEquals(pulsarClient.getConfiguration().getStatsIntervalSeconds(), 80,
+                "builder default should overrite if set explicitly");
+    }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index f3227d9..d4a25ee 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -290,7 +290,7 @@ public class PulsarSourceBuilder<T> {
 
     private void setAuth() throws PulsarClientException{
         if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
-                && this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
+                || StringUtils.isBlank(this.clientConfigurationData.getAuthParams()))
             return;
 
         clientConfigurationData.setAuthentication(
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index eb667b0..736c417 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -61,65 +61,55 @@ public class PulsarAvroOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(null)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(null);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(null)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(null);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(StringUtils.EMPTY)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(StringUtils.EMPTY);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(StringUtils.EMPTY)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(StringUtils.EMPTY);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(StringUtils.EMPTY)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(StringUtils.EMPTY);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test
     public void testPulsarAvroOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf);
         assertNotNull(pulsarAvroOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a374053..713f867 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -60,65 +60,55 @@ public class PulsarCsvOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(null)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(null);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(null)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(null);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(StringUtils.EMPTY)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(StringUtils.EMPTY);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(StringUtils.EMPTY)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(StringUtils.EMPTY);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test
     public void testPulsarCsvOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
         assertNotNull(pulsarCsvOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index 01edc9a..d45d9b1 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -60,65 +60,55 @@ public class PulsarJsonOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(null)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(null);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonROutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(null)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(null);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(StringUtils.EMPTY)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(StringUtils.EMPTY);
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(StringUtils.EMPTY)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(StringUtils.EMPTY);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarAvroOutputFormat(clientConf, producerConf);
     }
 
     @Test
     public void testPulsarJsonOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         PulsarJsonOutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat(clientConf, producerConf);
         assertNotNull(pulsarJsonOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 9c9a2fe..97b23b4 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -64,76 +64,65 @@ public class PulsarOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(null)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(null);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(null)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(null);
 
         new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(StringUtils.EMPTY)
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(StringUtils.EMPTY);
 
         new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(StringUtils.EMPTY)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(StringUtils.EMPTY);
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = NullPointerException.class)
     public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
+
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
         new PulsarOutputFormat(clientConf, producerConf, null);
     }
 
     @Test
     public void testPulsarOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl("testServiceUrl")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName("testTopic")
-                .build();
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName("testTopic");
 
         PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
         assertNotNull(pulsarCsvOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 024637b..7fd48c2 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -91,12 +91,11 @@ public class PulsarAvroTableSinkTest {
 
     private PulsarAvroTableSink spySink() throws Exception {
 
-        ClientConfigurationData clientConf = ClientConfigurationData.builder()
-                .serviceUrl(SERVICE_URL)
-                .build();
-        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
-                .topicName(TOPIC_NAME)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(SERVICE_URL);
+
+        ProducerConfigurationData producerConf = new ProducerConfigurationData();
+        producerConf.setTopicName(TOPIC_NAME);
 
         PulsarAvroTableSink sink =
                 new PulsarAvroTableSink(clientConf, producerConf, ROUTING_KEY, NasaMission.class);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 25755d2..668a8e5 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -85,10 +85,16 @@ public class PulsarJsonTableSinkTest {
     }
 
     private PulsarJsonTableSink spySink() throws Exception {
+        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
+        clientConfigurationData.setServiceUrl(SERVICE_URL);
+
+        ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+        producerConfigurationData.setTopicName(TOPIC_NAME);
+
         PulsarJsonTableSink sink = new PulsarJsonTableSink(
-                ClientConfigurationData.builder().serviceUrl(SERVICE_URL).build(),
-                ProducerConfigurationData.builder().topicName(TOPIC_NAME).build(),
+                clientConfigurationData, producerConfigurationData,
                 ROUTING_KEY);
+
         FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
         PowerMockito.whenNew(
                 FlinkPulsarProducer.class
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index e7fe78c..c89ad7e 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -61,12 +61,14 @@ public class PulsarSourceBuilderTest {
 
     @Test
     public void testBuildWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<>(Arrays.asList("testTopic")))
-                .subscriptionName("testSubscriptionName")
-                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
+
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicNames(new HashSet<>(Arrays.asList("testTopic")));
+        consumerConf.setSubscriptionName("testSubscriptionName");
+        consumerConf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
         SourceFunction sourceFunction = pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
                 .pulsarAllConsumerConf(consumerConf)
@@ -160,11 +162,12 @@ public class PulsarSourceBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<String>(Arrays.asList("testServiceUrl")))
-                .subscriptionName("testSubscriptionName")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(null);
+
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testServiceUrl")));
+        consumerConf.setSubscriptionName("testSubscriptionName");
 
         pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
@@ -174,11 +177,12 @@ public class PulsarSourceBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
-                .subscriptionName("testSubscriptionName")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(StringUtils.EMPTY);
+
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+        consumerConf.setSubscriptionName("testSubscriptionName");
 
         pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
@@ -188,11 +192,11 @@ public class PulsarSourceBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicsPattern(null)
-                .subscriptionName("testSubscriptionName")
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicsPattern(null);
+        consumerConf.setSubscriptionName("testSubscriptionName");
 
         pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
@@ -202,11 +206,12 @@ public class PulsarSourceBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
-                .subscriptionName(null)
-                .build();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
+
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+        consumerConf.setSubscriptionName(null);
 
         pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
@@ -216,32 +221,17 @@ public class PulsarSourceBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
-        pulsarSourceBuilder.topic(null);
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
-                .subscriptionName(StringUtils.EMPTY)
-                .build();
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl("testServiceUrl");
 
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionInitialPositionWithConfPojo() throws PulsarClientException {
-        pulsarSourceBuilder.topic(null);
-        ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
-        ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
-                .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
-                .subscriptionName("testSubscriptionName")
-                .subscriptionInitialPosition(null)
-                .build();
+        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+        consumerConf.setSubscriptionName(StringUtils.EMPTY);
 
         pulsarSourceBuilder
                 .pulsarAllClientConf(clientConf)
                 .pulsarAllConsumerConf(consumerConf)
                 .build();
     }
+
 }