You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/05/19 04:53:48 UTC

[pulsar] branch master updated: [pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5c418  [pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)
1d5c418 is described below

commit 1d5c418ccf3f2dc0844e735e5ff4bf0dd2f5738f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon May 18 21:53:36 2020 -0700

    [pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)
    
    ### Motivation
    In #4284, made [Authentication](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java#L33) a transient under ClientConfigurationData. `Authentication` is a serializable class because ClientConfig is used in Storm and Spark adapter to pass client configuration and Storm serializes and deserializes spout and bolt while executing them in topology. Now, after making `Authentication` transient variable storm always des [...]
    
    ### Modification
    Keep Authentication param serializable.
    
    ### Result
    It fixes pulsar-storm with authentication enabled.
---
 .../java/org/apache/pulsar/client/impl/ClientBuilderImpl.java    | 6 ++++++
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java     | 9 +++++++--
 .../apache/pulsar/client/impl/auth/AuthenticationDisabled.java   | 1 +
 .../org/apache/pulsar/client/impl/auth/AuthenticationToken.java  | 3 ++-
 .../apache/pulsar/client/impl/conf/ClientConfigurationData.java  | 4 +++-
 .../streaming/connectors/pulsar/CachedPulsarClientTest.java      | 4 ++--
 6 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3283166..8d3fdbe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -109,6 +109,9 @@ public class ClientBuilderImpl implements ClientBuilder {
     @Override
     public ClientBuilder authentication(String authPluginClassName, String authParamsString)
             throws UnsupportedAuthenticationException {
+        conf.setAuthPluginClassName(authPluginClassName);
+        conf.setAuthParams(authParamsString);
+        conf.setAuthParamMap(null);
         conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
         return this;
     }
@@ -116,6 +119,9 @@ public class ClientBuilderImpl implements ClientBuilder {
     @Override
     public ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
             throws UnsupportedAuthenticationException {
+        conf.setAuthPluginClassName(authPluginClassName);
+        conf.setAuthParamMap(authParams);
+        conf.setAuthParams(null);
         conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 91ca682..bcb0f51 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -151,11 +151,16 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
-        if (StringUtils.isBlank(conf.getAuthPluginClassName()) || StringUtils.isBlank( conf.getAuthParams())) {
+        if (StringUtils.isBlank(conf.getAuthPluginClassName())
+                || (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
             return;
         }
 
-        conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
+        if (StringUtils.isNotBlank(conf.getAuthParams())) {
+            conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
+        } else if (conf.getAuthParamMap() != null) {
+            conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
+        }
     }
 
     public ClientConfigurationData getConfiguration() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
index c74a38e..5ee7bcb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 public class AuthenticationDisabled implements Authentication, EncodedAuthenticationParameterSupport {
 
     protected final AuthenticationDataProvider nullData = new AuthenticationDataNull();
+    public static final AuthenticationDisabled INSTANCE = new AuthenticationDisabled();
     /**
      *
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
index bd13401..4ae3d3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -38,7 +38,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
  */
 public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {
 
-    private Supplier<String> tokenSupplier;
+    private static final long serialVersionUID = 1L;
+    private transient Supplier<String> tokenSupplier;
 
     public AuthenticationToken() {
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 0cf496a..980630c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -46,9 +47,10 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private transient ServiceUrlProvider serviceUrlProvider;
 
     @JsonIgnore
-    private transient Authentication authentication = new AuthenticationDisabled();
+    private Authentication authentication = AuthenticationDisabled.INSTANCE;
     private String authPluginClassName;
     private String authParams;
+    private Map<String, String> authParamMap;
 
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
index 39cdca1..0c81c4a 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -37,7 +37,7 @@ public class CachedPulsarClientTest {
 
     private static final String SERVICE_URL = "pulsar://localhost:6650";
 
-    @BeforeTest
+    @BeforeMethod
     public void clearCache() {
         CachedPulsarClient.clear();
     }