You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:41 UTC

[pulsar] 08/09: [pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b7c2c19859fd945725706585c5f227ff19392ca0
Author: Guangning E <gu...@apache.org>
AuthorDate: Thu Sep 9 08:55:45 2021 +0800

    [pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)
    
    ### Motivation
    
    In some scenarios (e.g. azure cloud), when the client exchanges tokens with the server, an optional scope parameter is required, this pr fixes this issue, to ensure compatibility, when the user does not fill in this parameter, all behavior is the same as before.
    
    ### Modifications
    
    * Add an optional parameter scope when exchanges token
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit ac5114f8944784972b831438f8c7e0cbd57db4e5)
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  18 ++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   9 +-
 .../protocol/ClientCredentialsExchangeRequest.java |   3 +
 .../impl/auth/oauth2/protocol/TokenClient.java     |  54 +++++++---
 .../impl/auth/oauth2/protocol/TokenClientTest.java | 116 +++++++++++++++++++++
 5 files changed, 182 insertions(+), 18 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 54da5287d..707fcaf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -37,10 +37,28 @@ public final class AuthenticationFactoryOAuth2 {
      * @return an Authentication object
      */
     public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
+        return clientCredentials(issuerUrl, credentialsUrl, audience, null);
+    }
+
+    /**
+     * Authenticate with client credentials.
+     *
+     * @param issuerUrl the issuer URL
+     * @param credentialsUrl the credentials URL
+     * @param audience the audience identifier
+     * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
+     *              case-sensitive strings.  The strings are defined by the authorization server.
+     *              If the value contains multiple space-delimited strings, their order does not matter,
+     *              and each string adds an additional access range to the requested scope.
+     *              From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+     * @return an Authentication object
+     */
+    public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
         ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .privateKey(credentialsUrl.toExternalForm())
                 .audience(audience)
+                .scope(scope)
                 .build();
         return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 8d82cc2..b011e85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -46,21 +46,24 @@ class ClientCredentialsFlow extends FlowBase {
     public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
     public static final String CONFIG_PARAM_AUDIENCE = "audience";
     public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+    public static final String CONFIG_PARAM_SCOPE = "scope";
 
     private static final long serialVersionUID = 1L;
 
     private final String audience;
     private final String privateKey;
+    private final String scope;
 
     private transient ClientCredentialsExchanger exchanger;
 
     private boolean initialized = false;
 
     @Builder
-    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
+    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) {
         super(issuerUrl);
         this.audience = audience;
         this.privateKey = privateKey;
+        this.scope = scope;
     }
 
     @Override
@@ -87,6 +90,7 @@ class ClientCredentialsFlow extends FlowBase {
                 .clientId(keyFile.getClientId())
                 .clientSecret(keyFile.getClientSecret())
                 .audience(this.audience)
+                .scope(this.scope)
                 .build();
         TokenResult tr;
         if (!initialized) {
@@ -116,10 +120,13 @@ class ClientCredentialsFlow extends FlowBase {
         URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
         String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
         String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+        // This is an optional parameter
+        String scope = params.get(CONFIG_PARAM_SCOPE);
         return ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .audience(audience)
                 .privateKey(privateKeyUrl)
+                .scope(scope)
                 .build();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
index 7c14296..2d37bb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -39,4 +39,7 @@ public class ClientCredentialsExchangeRequest {
 
     @JsonProperty("audience")
     private String audience;
+
+    @JsonProperty("scope")
+    private String scope;
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 9151fc3..f8667e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.asynchttpclient.AsyncHttpClient;
@@ -47,15 +48,22 @@ public class TokenClient implements ClientCredentialsExchanger {
     private final AsyncHttpClient httpClient;
 
     public TokenClient(URL tokenUrl) {
-        this.tokenUrl = tokenUrl;
+        this(tokenUrl, null);
+    }
 
-        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
-        confBuilder.setFollowRedirect(true);
-        confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
-        confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
-        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
-        AsyncHttpClientConfig config = confBuilder.build();
-        httpClient = new DefaultAsyncHttpClient(config);
+    TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
+        if (httpClient == null) {
+            DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+            confBuilder.setFollowRedirect(true);
+            confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
+            confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+            confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
+            AsyncHttpClientConfig config = confBuilder.build();
+            this.httpClient = new DefaultAsyncHttpClient(config);
+        } else {
+            this.httpClient = httpClient;
+        }
+        this.tokenUrl = tokenUrl;
     }
 
     @Override
@@ -64,6 +72,23 @@ public class TokenClient implements ClientCredentialsExchanger {
     }
 
     /**
+     * Constructing http request parameters.
+     * @param bodyMap List of parameters to be requested.
+     * @return Generate the final request body from a map.
+     */
+    String buildClientCredentialsBody(Map<String, String> bodyMap) {
+        return bodyMap.entrySet().stream()
+                .map(e -> {
+                    try {
+                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
+                    } catch (UnsupportedEncodingException e1) {
+                        throw new RuntimeException(e1);
+                    }
+                })
+                .collect(Collectors.joining("&"));
+    }
+
+    /**
      * Performs a token exchange using client credentials.
      * @param req the client credentials request details.
      * @return a token result
@@ -76,15 +101,10 @@ public class TokenClient implements ClientCredentialsExchanger {
         bodyMap.put("client_id", req.getClientId());
         bodyMap.put("client_secret", req.getClientSecret());
         bodyMap.put("audience", req.getAudience());
-        String body = bodyMap.entrySet().stream()
-                .map(e -> {
-                    try {
-                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
-                    } catch (UnsupportedEncodingException e1) {
-                        throw new RuntimeException(e1);
-                    }
-                })
-                .collect(Collectors.joining("&"));
+        if (!StringUtils.isBlank(req.getScope())) {
+            bodyMap.put("scope", req.getScope());
+        }
+        String body = buildClientCredentialsBody(bodyMap);
 
         try {
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
new file mode 100644
index 0000000..1617359
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.google.gson.Gson;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Response;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Token client exchange token mock test.
+ */
+public class TokenClientTest {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeClientCredentialsSuccessByScopeTest() throws
+            IOException, TokenExchangeException, ExecutionException, InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost");
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        Map<String, String> bodyMap = new TreeMap<>();
+        ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
+                .audience("test-audience")
+                .clientId("test-client-id")
+                .clientSecret("test-client-secret")
+                .scope("test-scope")
+                .build();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", request.getClientId());
+        bodyMap.put("client_secret", request.getClientSecret());
+        bodyMap.put("audience", request.getAudience());
+        bodyMap.put("scope", request.getScope());
+        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
+        when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        Assert.assertNotNull(tr);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeClientCredentialsSuccessByNoScopeTest() throws
+            IOException, TokenExchangeException, ExecutionException, InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost");
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        Map<String, String> bodyMap = new TreeMap<>();
+        ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
+                .audience("test-audience")
+                .clientId("test-client-id")
+                .clientSecret("test-client-secret")
+                .build();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", request.getClientId());
+        bodyMap.put("client_secret", request.getClientSecret());
+        bodyMap.put("audience", request.getAudience());
+        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
+        when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        Assert.assertNotNull(tr);
+    }
+}