You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/06 15:47:06 UTC
[pulsar] branch master updated: [Issue 4379] [Java Client] Build
auth from class and params in PulsarClientImpl (#4381)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17d2b42 [Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)
17d2b42 is described below
commit 17d2b4213ed401bc6dad14f4f76ef8ce270b0461
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Thu Jun 6 21:17:00 2019 +0530
[Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)
* Flink client to accept all pulsar client conf
In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
1. accept all params of client, producer and consumer
2. Keep pace with pulsar-client
* Flink client to accept all pulsar client conf
Added test cases
* Removing commented code
* flink: construct auth when building pulsarsource
* fixed failing tests
* removed unused import
* Added builder defaults for lombok builder
Set Auth from class and params (if set) in PulsarClientImpl.java
* Remove @BUilder.default from attributes where no defaults exist
* Added tests for ClientConfiguration Data builders
* cosmetic changes in code
Co-Authored-By: Sijie Guo <gu...@gmail.com>
* fixing typo
* Removed test, not true anymore
* Removed lombok builders
* fixed the failing tests
* Because the authentication field is transient, it is not serialized. On desirialization then its null and desirialization crashes with NPE
---
.../pulsar/client/impl/PulsarClientImpl.java | 11 +++
.../client/impl/conf/ClientConfigurationData.java | 15 ++--
.../impl/conf/ConsumerConfigurationData.java | 1 -
.../impl/conf/ProducerConfigurationData.java | 1 -
.../client/impl/auth/AuthenticationTokenTest.java | 30 ++++++++
.../impl/conf/ConfigurationDataUtilsTest.java | 18 +++++
.../connectors/pulsar/PulsarSourceBuilder.java | 2 +-
.../pulsar/PulsarAvroOutputFormatTest.java | 50 ++++++-------
.../pulsar/PulsarCsvOutputFormatTest.java | 50 ++++++-------
.../pulsar/PulsarJsonOutputFormatTest.java | 50 ++++++-------
.../connectors/pulsar/PulsarOutputFormatTest.java | 61 +++++++---------
.../connectors/pulsar/PulsarAvroTableSinkTest.java | 11 ++-
.../connectors/pulsar/PulsarJsonTableSinkTest.java | 10 ++-
.../connectors/pulsar/PulsarSourceBuilderTest.java | 84 ++++++++++------------
14 files changed, 204 insertions(+), 190 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 57d1956..0e58796 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
@@ -57,6 +58,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -127,6 +129,7 @@ public class PulsarClientImpl implements PulsarClient {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
+ setAuth(conf);
this.conf = conf;
conf.getAuthentication().start();
this.cnxPool = cnxPool;
@@ -142,6 +145,14 @@ public class PulsarClientImpl implements PulsarClient {
state.set(State.Open);
}
+ private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
+ if (StringUtils.isBlank(conf.getAuthPluginClassName()) || StringUtils.isBlank( conf.getAuthParams())) {
+ return;
+ }
+
+ conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
+ }
+
public ClientConfigurationData getConfiguration() {
return conf;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 6195027..3ab8638 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -28,14 +28,12 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.io.Serializable;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* This is a simple holder of the client configuration values.
*/
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClientConfigurationData implements Serializable, Cloneable {
@@ -47,10 +45,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
@JsonIgnore
private transient Authentication authentication = new AuthenticationDisabled();
- @JsonIgnore
- private transient String authPluginClassName;
- @JsonIgnore
- private transient Map<String, String> authParams;
+ private String authPluginClassName;
+ private String authParams;
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
@@ -74,6 +70,13 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
+ public Authentication getAuthentication() {
+ if (authentication == null) {
+ this.authentication = new AuthenticationDisabled();
+ }
+ return authentication;
+ }
+
public boolean isUseTls() {
if (useTls)
return true;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 3352ebb..8aca7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 32f5e61..7ec8f90 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -42,7 +42,6 @@ import com.google.common.collect.Sets;
import lombok.Data;
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProducerConfigurationData implements Serializable, Cloneable {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
index 6063a02..e3a94ee 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
@@ -29,7 +29,10 @@ import java.io.File;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;
public class AuthenticationTokenTest {
@@ -55,6 +58,33 @@ public class AuthenticationTokenTest {
}
@Test
+ public void testAuthTokenClientConfig() throws Exception {
+ ClientConfigurationData clientConfig = new ClientConfigurationData();
+ clientConfig.setServiceUrl("pulsar://service-url");
+ clientConfig.setAuthPluginClassName(AuthenticationToken.class.getName());
+ clientConfig.setAuthParams("token-xyz");
+
+ PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
+
+ Authentication authToken = pulsarClient.getConfiguration().getAuthentication();
+ assertEquals(authToken.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider authData = authToken.getAuthData();
+ assertTrue(authData.hasDataFromCommand());
+ assertEquals(authData.getCommandData(), "token-xyz");
+
+ assertFalse(authData.hasDataForTls());
+ assertNull(authData.getTlsCertificates());
+ assertNull(authData.getTlsPrivateKey());
+
+ assertTrue(authData.hasDataForHttp());
+ assertEquals(authData.getHttpHeaders(),
+ Collections.singletonMap("Authorization", "Bearer token-xyz").entrySet());
+
+ authToken.close();
+ }
+
+ @Test
public void testAuthTokenConfig() throws Exception {
AuthenticationToken authToken = new AuthenticationToken();
authToken.configure("token:my-test-token-string");
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 83ed4e8..626c501 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -25,6 +25,9 @@ import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.Test;
/**
@@ -108,4 +111,19 @@ public class ConfigurationDataUtilsTest {
assertTrue(re.getCause() instanceof IOException);
}
}
+
+ @Test
+ public void testConfigBuilder() throws PulsarClientException {
+ ClientConfigurationData clientConfig = new ClientConfigurationData();
+ clientConfig.setServiceUrl("pulsar://unknown:6650");
+ clientConfig.setStatsIntervalSeconds(80);
+
+ PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
+ assertTrue(pulsarClient != null, "Pulsar client built using config should not be null");
+
+ assertTrue(pulsarClient.getConfiguration().getServiceUrl().equals("pulsar://unknown:6650"));
+ assertEquals(pulsarClient.getConfiguration().getNumListenerThreads(), 1, "builder default not set properly");
+ assertEquals(pulsarClient.getConfiguration().getStatsIntervalSeconds(), 80,
+ "builder default should overrite if set explicitly");
+ }
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index f3227d9..d4a25ee 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -290,7 +290,7 @@ public class PulsarSourceBuilder<T> {
private void setAuth() throws PulsarClientException{
if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
- && this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
+ || StringUtils.isBlank(this.clientConfigurationData.getAuthParams()))
return;
clientConfigurationData.setAuthentication(
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index eb667b0..736c417 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -61,65 +61,55 @@ public class PulsarAvroOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(null)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(null);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(null)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(null);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(StringUtils.EMPTY)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(StringUtils.EMPTY);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(StringUtils.EMPTY)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(StringUtils.EMPTY);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(StringUtils.EMPTY)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(StringUtils.EMPTY);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test
public void testPulsarAvroOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf);
assertNotNull(pulsarAvroOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a374053..713f867 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -60,65 +60,55 @@ public class PulsarCsvOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(null)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(null);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(null)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(null);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(StringUtils.EMPTY)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(StringUtils.EMPTY);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(StringUtils.EMPTY)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(StringUtils.EMPTY);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test
public void testPulsarCsvOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
assertNotNull(pulsarCsvOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index 01edc9a..d45d9b1 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -60,65 +60,55 @@ public class PulsarJsonOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(null)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(null);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonROutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(null)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(null);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(StringUtils.EMPTY)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(StringUtils.EMPTY);
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(StringUtils.EMPTY)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(StringUtils.EMPTY);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarAvroOutputFormat(clientConf, producerConf);
}
@Test
public void testPulsarJsonOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
PulsarJsonOutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat(clientConf, producerConf);
assertNotNull(pulsarJsonOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 9c9a2fe..97b23b4 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -64,76 +64,65 @@ public class PulsarOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(null)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(null);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(null)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(null);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(StringUtils.EMPTY)
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(StringUtils.EMPTY);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(StringUtils.EMPTY)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(StringUtils.EMPTY);
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = NullPointerException.class)
public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
+
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
new PulsarOutputFormat(clientConf, producerConf, null);
}
@Test
public void testPulsarOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl("testServiceUrl")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName("testTopic")
- .build();
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName("testTopic");
PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
assertNotNull(pulsarCsvOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 024637b..7fd48c2 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -91,12 +91,11 @@ public class PulsarAvroTableSinkTest {
private PulsarAvroTableSink spySink() throws Exception {
- ClientConfigurationData clientConf = ClientConfigurationData.builder()
- .serviceUrl(SERVICE_URL)
- .build();
- ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
- .topicName(TOPIC_NAME)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(SERVICE_URL);
+
+ ProducerConfigurationData producerConf = new ProducerConfigurationData();
+ producerConf.setTopicName(TOPIC_NAME);
PulsarAvroTableSink sink =
new PulsarAvroTableSink(clientConf, producerConf, ROUTING_KEY, NasaMission.class);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 25755d2..668a8e5 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -85,10 +85,16 @@ public class PulsarJsonTableSinkTest {
}
private PulsarJsonTableSink spySink() throws Exception {
+ ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
+ clientConfigurationData.setServiceUrl(SERVICE_URL);
+
+ ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+ producerConfigurationData.setTopicName(TOPIC_NAME);
+
PulsarJsonTableSink sink = new PulsarJsonTableSink(
- ClientConfigurationData.builder().serviceUrl(SERVICE_URL).build(),
- ProducerConfigurationData.builder().topicName(TOPIC_NAME).build(),
+ clientConfigurationData, producerConfigurationData,
ROUTING_KEY);
+
FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
PowerMockito.whenNew(
FlinkPulsarProducer.class
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index e7fe78c..c89ad7e 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -61,12 +61,14 @@ public class PulsarSourceBuilderTest {
@Test
public void testBuildWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<>(Arrays.asList("testTopic")))
- .subscriptionName("testSubscriptionName")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
+
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicNames(new HashSet<>(Arrays.asList("testTopic")));
+ consumerConf.setSubscriptionName("testSubscriptionName");
+ consumerConf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
SourceFunction sourceFunction = pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
.pulsarAllConsumerConf(consumerConf)
@@ -160,11 +162,12 @@ public class PulsarSourceBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<String>(Arrays.asList("testServiceUrl")))
- .subscriptionName("testSubscriptionName")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(null);
+
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testServiceUrl")));
+ consumerConf.setSubscriptionName("testSubscriptionName");
pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
@@ -174,11 +177,12 @@ public class PulsarSourceBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
- .subscriptionName("testSubscriptionName")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(StringUtils.EMPTY);
+
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+ consumerConf.setSubscriptionName("testSubscriptionName");
pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
@@ -188,11 +192,11 @@ public class PulsarSourceBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicsPattern(null)
- .subscriptionName("testSubscriptionName")
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicsPattern(null);
+ consumerConf.setSubscriptionName("testSubscriptionName");
pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
@@ -202,11 +206,12 @@ public class PulsarSourceBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
- .subscriptionName(null)
- .build();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
+
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+ consumerConf.setSubscriptionName(null);
pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
@@ -216,32 +221,17 @@ public class PulsarSourceBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
- pulsarSourceBuilder.topic(null);
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
- .subscriptionName(StringUtils.EMPTY)
- .build();
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl("testServiceUrl");
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testSubscriptionInitialPositionWithConfPojo() throws PulsarClientException {
- pulsarSourceBuilder.topic(null);
- ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
- ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
- .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
- .subscriptionName("testSubscriptionName")
- .subscriptionInitialPosition(null)
- .build();
+ ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
+ consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
+ consumerConf.setSubscriptionName(StringUtils.EMPTY);
pulsarSourceBuilder
.pulsarAllClientConf(clientConf)
.pulsarAllConsumerConf(consumerConf)
.build();
}
+
}