You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2021/02/05 23:58:59 UTC

[incubator-pinot] 01/01: add optional http basic auth to pinot broker

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9c4bb4558884b47c3f1936357b7d37b8fb6737fe
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Fri Feb 5 15:58:34 2021 -0800

    add optional http basic auth to pinot broker
---
 .../pinot/broker/api/HttpRequesterIdentity.java    |  26 ++++
 .../broker/api/resources/PinotClientRequest.java   |  33 +++--
 .../broker/BasicAuthAccessControlFactory.java      | 139 +++++++++++++++++++++
 .../broker/broker/BasicAuthAccessControlTest.java  | 121 ++++++++++++++++++
 .../api/resources/PinotQueryResource.java          |  16 ++-
 5 files changed, 324 insertions(+), 11 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java
new file mode 100644
index 0000000..5c5d931
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java
@@ -0,0 +1,26 @@
+package org.apache.pinot.broker.api;
+
+import com.google.common.collect.Multimap;
+import java.security.Principal;
+
+
+public class HttpRequesterIdentity extends RequesterIdentity {
+  private Multimap<String, String> _httpHeaders;
+  private String _endpointUrl;
+
+  public Multimap<String, String> getHttpHeaders() {
+    return _httpHeaders;
+  }
+
+  public void setHttpHeaders(Multimap<String, String> httpHeaders) {
+    _httpHeaders = httpHeaders;
+  }
+
+  public String getEndpointUrl() {
+    return _endpointUrl;
+  }
+
+  public void setEndpointUrl(String endpointUrl) {
+    _endpointUrl = endpointUrl;
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 290de8c..65d2bf6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.broker.api.resources;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -34,8 +36,10 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -70,7 +74,7 @@ public class PinotClientRequest {
       @ApiParam(value = "Query", required = true) @QueryParam("bql") String query,
       @ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
       @ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
-      @Suspended AsyncResponse asyncResponse) {
+      @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) {
     try {
       ObjectNode requestJson = JsonUtils.newObjectNode();
       requestJson.put(Request.PQL, query);
@@ -80,7 +84,7 @@ public class PinotClientRequest {
       if (debugOptions != null) {
         requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
       }
-      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
       asyncResponse.resume(brokerResponse.toJsonString());
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
@@ -95,10 +99,11 @@ public class PinotClientRequest {
   @Path("query")
   @ApiOperation(value = "Querying pinot")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")})
-  public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse) {
+  public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
     try {
       JsonNode requestJson = JsonUtils.stringToJsonNode(query);
-      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
       asyncResponse.resume(brokerResponse);
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing POST request", e);
@@ -116,7 +121,7 @@ public class PinotClientRequest {
   public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
       @ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
       @ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
-      @Suspended AsyncResponse asyncResponse) {
+      @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) {
     try {
       ObjectNode requestJson = JsonUtils.newObjectNode();
       requestJson.put(Request.SQL, query);
@@ -128,7 +133,7 @@ public class PinotClientRequest {
       if (debugOptions != null) {
         requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
       }
-      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
       asyncResponse.resume(brokerResponse.toJsonString());
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
@@ -143,7 +148,8 @@ public class PinotClientRequest {
   @Path("query/sql")
   @ApiOperation(value = "Querying pinot using sql")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")})
-  public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse) {
+  public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
     try {
       JsonNode requestJson = JsonUtils.stringToJsonNode(query);
       if (!requestJson.has(Request.SQL)) {
@@ -152,7 +158,7 @@ public class PinotClientRequest {
       String queryOptions = constructSqlQueryOptions();
       // the only query options as of now are sql related. do not allow any custom query options in sql endpoint
       ObjectNode sqlRequestJson = ((ObjectNode) requestJson).put(Request.QUERY_OPTIONS, queryOptions);
-      BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, null, new RequestStatistics());
+      BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, makeHttpIdentity(requestContext), new RequestStatistics());
       asyncResponse.resume(brokerResponse.toJsonString());
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing POST request", e);
@@ -165,4 +171,15 @@ public class PinotClientRequest {
     return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" + Request.QueryOptionKey.RESPONSE_FORMAT + "="
         + Request.SQL;
   }
+
+  private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
+    Multimap<String, String> headers = ArrayListMultimap.create();
+    context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
+
+    HttpRequesterIdentity identity = new HttpRequesterIdentity();
+    identity.setHttpHeaders(headers);
+    identity.setEndpointUrl(context.getRequestURL().toString());
+
+    return identity;
+  }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
new file mode 100644
index 0000000..4557cc5
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
@@ -0,0 +1,139 @@
+package org.apache.pinot.broker.broker;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+
+/**
+ * Basic Authentication based on http headers. Configured via the "pinot.broker.access.control" family of properties.
+ *
+ * <pre>
+ *     Example:
+ *     pinot.broker.access.control.principals=admin123,user456
+ *     pinot.broker.access.control.principals.admin123.password=verysecret
+ *     pinot.broker.access.control.principals.user456.password=kindasecret
+ *     pinot.broker.access.control.principals.user456.tables=stuff,lessImportantStuff
+ * </pre>
+ */
+public class BasicAuthAccessControlFactory extends AccessControlFactory {
+  private static final String PRINCIPALS = "principals";
+  private static final String PASSWORD = "password";
+  private static final String TABLES = "tables";
+  private static final String TABLES_ALL = "*";
+
+  private static final String HEADER_AUTHORIZATION = "authorization";
+
+  private AccessControl _accessControl;
+
+  public BasicAuthAccessControlFactory() {
+    // left blank
+  }
+
+  public void init(PinotConfiguration configuration) {
+    String principalNames = configuration.getProperty(PRINCIPALS);
+    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
+      String name = rawName.trim();
+      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+      String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD));
+      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+      Set<String> tables = new HashSet<>();
+      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES));
+      if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) {
+        tables.addAll(Arrays.asList(tableNames.split(",")));
+      }
+
+      return new BasicAuthPrincipal(name, toToken(name, password), tables);
+    }).collect(Collectors.toList());
+
+    _accessControl = new BasicAuthAccessControl(principals);
+  }
+
+  public AccessControl create() {
+    return _accessControl;
+  }
+
+  /**
+   * Access Control using header-based basic http authentication
+   */
+  private static class BasicAuthAccessControl implements AccessControl {
+    private final Map<String, BasicAuthPrincipal> _principals;
+
+    public BasicAuthAccessControl(Collection<BasicAuthPrincipal> principals) {
+      this._principals = principals.stream().collect(Collectors.toMap(BasicAuthPrincipal::getToken, p -> p));
+    }
+
+    @Override
+    public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) {
+      Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity, "HttpRequesterIdentity required");
+      HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity;
+
+      Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
+      Optional<BasicAuthPrincipal> principalOpt = tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken)
+              .map(_principals::get).filter(Objects::nonNull).findFirst();
+
+      if (!principalOpt.isPresent()) {
+        // no matching token? reject
+        return false;
+      }
+
+      BasicAuthPrincipal principal = principalOpt.get();
+      if (principal.getTables().isEmpty()) {
+        // no table restrictions? accept
+        return true;
+      }
+
+      return principal.getTables().contains(brokerRequest.getQuerySource().getTableName());
+    }
+  }
+
+  /**
+   * Container object for basic auth principal
+   */
+  private static class BasicAuthPrincipal {
+    private final String _name;
+    private final String _token;
+    private final Set<String> _tables;
+
+    public BasicAuthPrincipal(String name, String token, Set<String> tables) {
+      this._name = name;
+      this._token = token;
+      this._tables = tables;
+    }
+
+    public String getName() {
+      return _name;
+    }
+
+    public Set<String> getTables() {
+      return _tables;
+    }
+
+    public String getToken() {
+      return _token;
+    }
+  }
+
+  private static String toToken(String name, String password) {
+    String identifier = String.format("%s:%s", name, password);
+    return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+  }
+
+  private static String normalizeToken(String token) {
+    if (token == null) {
+      return null;
+    }
+    return token.trim().replace("=", "");
+  }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java
new file mode 100644
index 0000000..bf1fedb
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java
@@ -0,0 +1,121 @@
+package org.apache.pinot.broker.broker;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BasicAuthAccessControlTest {
+    private static final String TOKEN_USER = "Basic dXNlcjpzZWNyZXQ"; // user:secret
+    private static final String TOKEN_ADMIN = "Basic YWRtaW46dmVyeXNlY3JldA"; // admin:verysecret
+
+    private static final String HEADER_AUTHORIZATION = "authorization";
+
+    private AccessControl _accessControl;
+
+    @BeforeClass
+    public void setup() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("principals", "admin,user");
+        config.put("principals.admin.password", "verysecret");
+        config.put("principals.user.password", "secret");
+        config.put("principals.user.tables", "lessImportantStuff");
+
+        AccessControlFactory factory = new BasicAuthAccessControlFactory();
+        factory.init(new PinotConfiguration(config));
+
+        _accessControl = factory.create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testNullEntity() {
+        _accessControl.hasAccess(null, null);
+    }
+
+    @Test
+    public void testNullToken() {
+        Multimap<String, String> headers = ArrayListMultimap.create();
+
+        HttpRequesterIdentity identity = new HttpRequesterIdentity();
+        identity.setHttpHeaders(headers);
+
+        Assert.assertFalse(_accessControl.hasAccess(identity, null));
+    }
+
+    @Test
+    public void testAllow() {
+        Multimap<String, String> headers = ArrayListMultimap.create();
+        headers.put(HEADER_AUTHORIZATION, TOKEN_USER);
+
+        HttpRequesterIdentity identity = new HttpRequesterIdentity();
+        identity.setHttpHeaders(headers);
+
+        QuerySource source = new QuerySource();
+        source.setTableName("lessImportantStuff");
+
+        BrokerRequest request = new BrokerRequest();
+        request.setQuerySource(source);
+
+        Assert.assertTrue(_accessControl.hasAccess(identity, request));
+    }
+
+    @Test
+    public void testDeny() {
+        Multimap<String, String> headers = ArrayListMultimap.create();
+        headers.put(HEADER_AUTHORIZATION, TOKEN_USER);
+
+        HttpRequesterIdentity identity = new HttpRequesterIdentity();
+        identity.setHttpHeaders(headers);
+
+        QuerySource source = new QuerySource();
+        source.setTableName("veryImportantStuff");
+
+        BrokerRequest request = new BrokerRequest();
+        request.setQuerySource(source);
+
+        Assert.assertFalse(_accessControl.hasAccess(identity, request));
+    }
+
+    @Test
+    public void testAllowAll() {
+        Multimap<String, String> headers = ArrayListMultimap.create();
+        headers.put(HEADER_AUTHORIZATION, TOKEN_ADMIN);
+
+        HttpRequesterIdentity identity = new HttpRequesterIdentity();
+        identity.setHttpHeaders(headers);
+
+        QuerySource source = new QuerySource();
+        source.setTableName("veryImportantStuff");
+
+        BrokerRequest request = new BrokerRequest();
+        request.setQuerySource(source);
+
+        Assert.assertTrue(_accessControl.hasAccess(identity, request));
+    }
+
+    @Test
+    public void testNormalizeToken() {
+        Multimap<String, String> headers = ArrayListMultimap.create();
+        headers.put(HEADER_AUTHORIZATION, "  " + TOKEN_USER + "== ");
+
+        HttpRequesterIdentity identity = new HttpRequesterIdentity();
+        identity.setHttpHeaders(headers);
+
+        QuerySource source = new QuerySource();
+        source.setTableName("lessImportantStuff");
+
+        BrokerRequest request = new BrokerRequest();
+        request.setQuerySource(source);
+
+        Assert.assertTrue(_accessControl.hasAccess(identity, request));
+    }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index dfac60a..31e4969 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -29,11 +29,13 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -41,6 +43,8 @@ import javax.ws.rs.Path;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedMap;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.exception.QueryException;
@@ -210,7 +214,13 @@ public class PinotQueryResource {
     String url = getQueryURL(protocol, hostNameWithPrefix.substring(hostNameWithPrefix.indexOf("_") + 1),
         String.valueOf(port), querySyntax);
     ObjectNode requestJson = getRequestJson(query, traceEnabled, queryOptions, querySyntax);
-    return sendRequestRaw(url, query, requestJson);
+
+    Map<String, String> headers = httpHeaders.getRequestHeaders().entrySet().stream()
+        .filter(entry -> !entry.getValue().isEmpty())
+        .map(entry -> Pair.of(entry.getKey(), entry.getValue().get(0)))
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    return sendRequestRaw(url, query, requestJson, headers);
   }
 
   private ObjectNode getRequestJson(String query, String traceEnabled, String queryOptions, String querySyntax) {
@@ -325,10 +335,10 @@ public class PinotQueryResource {
     }
   }
 
-  public String sendRequestRaw(String url, String query, ObjectNode requestJson) {
+  public String sendRequestRaw(String url, String query, ObjectNode requestJson, Map<String, String> headers) {
     try {
       final long startTime = System.currentTimeMillis();
-      final String pinotResultString = sendPostRaw(url, requestJson.toString(), null);
+      final String pinotResultString = sendPostRaw(url, requestJson.toString(), headers);
 
       final long queryTime = System.currentTimeMillis() - startTime;
       LOGGER.info("Query: " + query + " Time: " + queryTime);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org