You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/18 12:27:11 UTC

[pulsar] branch master updated: [Issue 4283][flink] construct auth when building pulsar source (#4284)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 353ca73  [Issue 4283][flink] construct auth when building pulsar source (#4284)
353ca73 is described below

commit 353ca73b11124f3df58934f4f496d370545d8a05
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Sat May 18 17:57:04 2019 +0530

    [Issue 4283][flink] construct auth when building pulsar source (#4284)
    
    Fixes #4283
    
    ### Motivation
    
    pulsar flink connector now uses ClientConfigData to instantiate pulsar client. Pulsar client composes Authentication which can not be serialized. In an environment with Auth there is no way to set auth in pulsar client.
    
    ### Modifications
    
    Keep auth params away from persistence and serialization. Construct auth when building pulsar source
---
 .../client/impl/conf/ClientConfigurationData.java   | 20 ++++++++++++--------
 .../connectors/pulsar/FlinkPulsarProducer.java      |  1 -
 .../connectors/pulsar/PulsarSourceBuilder.java      | 21 ++++++++++++++++++++-
 .../connectors/pulsar/PulsarSourceBuilderTest.java  | 19 ++++++++++---------
 4 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 3c7190b..673a5d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -18,19 +18,18 @@
  */
 package org.apache.pulsar.client.impl.conf;
 
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
+import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import lombok.Data;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This is a simple holder of the client configuration values.
@@ -44,10 +43,15 @@ public class ClientConfigurationData implements Serializable, Cloneable {
 
     private String serviceUrl;
     @JsonIgnore
-    private ServiceUrlProvider serviceUrlProvider;
+    private transient ServiceUrlProvider serviceUrlProvider;
 
     @JsonIgnore
-    private Authentication authentication = new AuthenticationDisabled();
+    private transient Authentication authentication = new AuthenticationDisabled();
+    @JsonIgnore
+    private transient String authPluginClassName;
+    @JsonIgnore
+    private transient Map<String, String> authParams;
+
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 29cfe0e..5afba67 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,7 +22,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
-import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 4521add..f3227d9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
@@ -268,7 +269,7 @@ public class PulsarSourceBuilder<T> {
     }
 
 
-    public SourceFunction<T> build() {
+    public SourceFunction<T> build() throws PulsarClientException{
         Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
                 "a service url is required");
         Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null &&
@@ -277,9 +278,27 @@ public class PulsarSourceBuilder<T> {
                 "At least one topic or topics pattern is required");
         Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
                 "a subscription name is required");
+
+        setTransientFields();
+
         return new PulsarConsumerSource<>(this);
     }
 
+    private void setTransientFields() throws PulsarClientException {
+        setAuth();
+    }
+
+    private void setAuth() throws PulsarClientException{
+        if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
+                && this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
+            return;
+
+        clientConfigurationData.setAuthentication(
+                AuthenticationFactory.create(
+                        this.clientConfigurationData.getAuthPluginClassName(),
+                        this.clientConfigurationData.getAuthParams()));
+    }
+
     /**
      * Creates a PulsarSourceBuilder.
      *
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index cc12ba4..e7fe78c 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -47,7 +48,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test
-    public void testBuild() {
+    public void testBuild() throws PulsarClientException {
         SourceFunction sourceFunction = pulsarSourceBuilder
                 .serviceUrl("testServiceUrl")
                 .topic("testTopic")
@@ -59,7 +60,7 @@ public class PulsarSourceBuilderTest {
 
 
     @Test
-    public void testBuildWithConfPojo() {
+    public void testBuildWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<>(Arrays.asList("testTopic")))
@@ -74,7 +75,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testBuildWithoutSettingRequiredProperties() {
+    public void testBuildWithoutSettingRequiredProperties() throws PulsarClientException {
         pulsarSourceBuilder.build();
     }
 
@@ -158,7 +159,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlNullWithConfPojo() {
+    public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testServiceUrl")))
@@ -172,7 +173,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlWithBlankWithConfPojo() {
+    public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
@@ -186,7 +187,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPatternWithNullWithConfPojo() {
+    public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicsPattern(null)
@@ -200,7 +201,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithNullWithConfPojo() {
+    public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
@@ -214,7 +215,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithBlankWithConfPojo() {
+    public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
         pulsarSourceBuilder.topic(null);
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
@@ -229,7 +230,7 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionInitialPositionWithConfPojo() {
+    public void testSubscriptionInitialPositionWithConfPojo() throws PulsarClientException {
         pulsarSourceBuilder.topic(null);
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()