You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2021/02/05 23:58:59 UTC
[incubator-pinot] 01/01: add optional http basic auth to pinot
broker
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch basic-auth-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9c4bb4558884b47c3f1936357b7d37b8fb6737fe
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Fri Feb 5 15:58:34 2021 -0800
add optional http basic auth to pinot broker
---
.../pinot/broker/api/HttpRequesterIdentity.java | 26 ++++
.../broker/api/resources/PinotClientRequest.java | 33 +++--
.../broker/BasicAuthAccessControlFactory.java | 139 +++++++++++++++++++++
.../broker/broker/BasicAuthAccessControlTest.java | 121 ++++++++++++++++++
.../api/resources/PinotQueryResource.java | 16 ++-
5 files changed, 324 insertions(+), 11 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java
new file mode 100644
index 0000000..5c5d931
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java
@@ -0,0 +1,26 @@
+package org.apache.pinot.broker.api;
+
+import com.google.common.collect.Multimap;
+import java.security.Principal;
+
+
+public class HttpRequesterIdentity extends RequesterIdentity {
+ private Multimap<String, String> _httpHeaders;
+ private String _endpointUrl;
+
+ public Multimap<String, String> getHttpHeaders() {
+ return _httpHeaders;
+ }
+
+ public void setHttpHeaders(Multimap<String, String> httpHeaders) {
+ _httpHeaders = httpHeaders;
+ }
+
+ public String getEndpointUrl() {
+ return _endpointUrl;
+ }
+
+ public void setEndpointUrl(String endpointUrl) {
+ _endpointUrl = endpointUrl;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 290de8c..65d2bf6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.broker.api.resources;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -34,8 +36,10 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -70,7 +74,7 @@ public class PinotClientRequest {
@ApiParam(value = "Query", required = true) @QueryParam("bql") String query,
@ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
@ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
- @Suspended AsyncResponse asyncResponse) {
+ @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) {
try {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put(Request.PQL, query);
@@ -80,7 +84,7 @@ public class PinotClientRequest {
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
- BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+ BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
@@ -95,10 +99,11 @@ public class PinotClientRequest {
@Path("query")
@ApiOperation(value = "Querying pinot")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")})
- public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse) {
+ public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse,
+ @Context org.glassfish.grizzly.http.server.Request requestContext) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
- BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+ BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
asyncResponse.resume(brokerResponse);
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
@@ -116,7 +121,7 @@ public class PinotClientRequest {
public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
@ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
@ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
- @Suspended AsyncResponse asyncResponse) {
+ @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) {
try {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put(Request.SQL, query);
@@ -128,7 +133,7 @@ public class PinotClientRequest {
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
- BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
+ BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
@@ -143,7 +148,8 @@ public class PinotClientRequest {
@Path("query/sql")
@ApiOperation(value = "Querying pinot using sql")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")})
- public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse) {
+ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse,
+ @Context org.glassfish.grizzly.http.server.Request requestContext) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
if (!requestJson.has(Request.SQL)) {
@@ -152,7 +158,7 @@ public class PinotClientRequest {
String queryOptions = constructSqlQueryOptions();
// the only query options as of now are sql related. do not allow any custom query options in sql endpoint
ObjectNode sqlRequestJson = ((ObjectNode) requestJson).put(Request.QUERY_OPTIONS, queryOptions);
- BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, null, new RequestStatistics());
+ BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, makeHttpIdentity(requestContext), new RequestStatistics());
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
@@ -165,4 +171,15 @@ public class PinotClientRequest {
return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" + Request.QueryOptionKey.RESPONSE_FORMAT + "="
+ Request.SQL;
}
+
+ private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+ context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+ identity.setEndpointUrl(context.getRequestURL().toString());
+
+ return identity;
+ }
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
new file mode 100644
index 0000000..4557cc5
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
@@ -0,0 +1,139 @@
+package org.apache.pinot.broker.broker;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+
+/**
+ * Basic Authentication based on http headers. Configured via the "pinot.broker.access.control" family of properties.
+ *
+ * <pre>
+ * Example:
+ * pinot.broker.access.control.principals=admin123,user456
+ * pinot.broker.access.control.principals.admin123.password=verysecret
+ * pinot.broker.access.control.principals.user456.password=kindasecret
+ * pinot.broker.access.control.principals.user456.tables=stuff,lessImportantStuff
+ * </pre>
+ */
+public class BasicAuthAccessControlFactory extends AccessControlFactory {
+ private static final String PRINCIPALS = "principals";
+ private static final String PASSWORD = "password";
+ private static final String TABLES = "tables";
+ private static final String TABLES_ALL = "*";
+
+ private static final String HEADER_AUTHORIZATION = "authorization";
+
+ private AccessControl _accessControl;
+
+ public BasicAuthAccessControlFactory() {
+ // left blank
+ }
+
+ public void init(PinotConfiguration configuration) {
+ String principalNames = configuration.getProperty(PRINCIPALS);
+ Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+ List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
+ String name = rawName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+ String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD));
+ Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+ Set<String> tables = new HashSet<>();
+ String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES));
+ if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) {
+ tables.addAll(Arrays.asList(tableNames.split(",")));
+ }
+
+ return new BasicAuthPrincipal(name, toToken(name, password), tables);
+ }).collect(Collectors.toList());
+
+ _accessControl = new BasicAuthAccessControl(principals);
+ }
+
+ public AccessControl create() {
+ return _accessControl;
+ }
+
+ /**
+ * Access Control using header-based basic http authentication
+ */
+ private static class BasicAuthAccessControl implements AccessControl {
+ private final Map<String, BasicAuthPrincipal> _principals;
+
+ public BasicAuthAccessControl(Collection<BasicAuthPrincipal> principals) {
+ this._principals = principals.stream().collect(Collectors.toMap(BasicAuthPrincipal::getToken, p -> p));
+ }
+
+ @Override
+ public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) {
+ Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity, "HttpRequesterIdentity required");
+ HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity;
+
+ Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
+ Optional<BasicAuthPrincipal> principalOpt = tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken)
+ .map(_principals::get).filter(Objects::nonNull).findFirst();
+
+ if (!principalOpt.isPresent()) {
+ // no matching token? reject
+ return false;
+ }
+
+ BasicAuthPrincipal principal = principalOpt.get();
+ if (principal.getTables().isEmpty()) {
+ // no table restrictions? accept
+ return true;
+ }
+
+ return principal.getTables().contains(brokerRequest.getQuerySource().getTableName());
+ }
+ }
+
+ /**
+ * Container object for basic auth principal
+ */
+ private static class BasicAuthPrincipal {
+ private final String _name;
+ private final String _token;
+ private final Set<String> _tables;
+
+ public BasicAuthPrincipal(String name, String token, Set<String> tables) {
+ this._name = name;
+ this._token = token;
+ this._tables = tables;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public Set<String> getTables() {
+ return _tables;
+ }
+
+ public String getToken() {
+ return _token;
+ }
+ }
+
+ private static String toToken(String name, String password) {
+ String identifier = String.format("%s:%s", name, password);
+ return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+ }
+
+ private static String normalizeToken(String token) {
+ if (token == null) {
+ return null;
+ }
+ return token.trim().replace("=", "");
+ }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java
new file mode 100644
index 0000000..bf1fedb
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java
@@ -0,0 +1,121 @@
+package org.apache.pinot.broker.broker;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BasicAuthAccessControlTest {
+ private static final String TOKEN_USER = "Basic dXNlcjpzZWNyZXQ"; // user:secret
+ private static final String TOKEN_ADMIN = "Basic YWRtaW46dmVyeXNlY3JldA"; // admin:verysecret
+
+ private static final String HEADER_AUTHORIZATION = "authorization";
+
+ private AccessControl _accessControl;
+
+ @BeforeClass
+ public void setup() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("principals", "admin,user");
+ config.put("principals.admin.password", "verysecret");
+ config.put("principals.user.password", "secret");
+ config.put("principals.user.tables", "lessImportantStuff");
+
+ AccessControlFactory factory = new BasicAuthAccessControlFactory();
+ factory.init(new PinotConfiguration(config));
+
+ _accessControl = factory.create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testNullEntity() {
+ _accessControl.hasAccess(null, null);
+ }
+
+ @Test
+ public void testNullToken() {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+
+ Assert.assertFalse(_accessControl.hasAccess(identity, null));
+ }
+
+ @Test
+ public void testAllow() {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+ headers.put(HEADER_AUTHORIZATION, TOKEN_USER);
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+
+ QuerySource source = new QuerySource();
+ source.setTableName("lessImportantStuff");
+
+ BrokerRequest request = new BrokerRequest();
+ request.setQuerySource(source);
+
+ Assert.assertTrue(_accessControl.hasAccess(identity, request));
+ }
+
+ @Test
+ public void testDeny() {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+ headers.put(HEADER_AUTHORIZATION, TOKEN_USER);
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+
+ QuerySource source = new QuerySource();
+ source.setTableName("veryImportantStuff");
+
+ BrokerRequest request = new BrokerRequest();
+ request.setQuerySource(source);
+
+ Assert.assertFalse(_accessControl.hasAccess(identity, request));
+ }
+
+ @Test
+ public void testAllowAll() {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+ headers.put(HEADER_AUTHORIZATION, TOKEN_ADMIN);
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+
+ QuerySource source = new QuerySource();
+ source.setTableName("veryImportantStuff");
+
+ BrokerRequest request = new BrokerRequest();
+ request.setQuerySource(source);
+
+ Assert.assertTrue(_accessControl.hasAccess(identity, request));
+ }
+
+ @Test
+ public void testNormalizeToken() {
+ Multimap<String, String> headers = ArrayListMultimap.create();
+ headers.put(HEADER_AUTHORIZATION, " " + TOKEN_USER + "== ");
+
+ HttpRequesterIdentity identity = new HttpRequesterIdentity();
+ identity.setHttpHeaders(headers);
+
+ QuerySource source = new QuerySource();
+ source.setTableName("lessImportantStuff");
+
+ BrokerRequest request = new BrokerRequest();
+ request.setQuerySource(source);
+
+ Assert.assertTrue(_accessControl.hasAccess(identity, request));
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index dfac60a..31e4969 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -29,11 +29,13 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -41,6 +43,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.exception.QueryException;
@@ -210,7 +214,13 @@ public class PinotQueryResource {
String url = getQueryURL(protocol, hostNameWithPrefix.substring(hostNameWithPrefix.indexOf("_") + 1),
String.valueOf(port), querySyntax);
ObjectNode requestJson = getRequestJson(query, traceEnabled, queryOptions, querySyntax);
- return sendRequestRaw(url, query, requestJson);
+
+ Map<String, String> headers = httpHeaders.getRequestHeaders().entrySet().stream()
+ .filter(entry -> !entry.getValue().isEmpty())
+ .map(entry -> Pair.of(entry.getKey(), entry.getValue().get(0)))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ return sendRequestRaw(url, query, requestJson, headers);
}
private ObjectNode getRequestJson(String query, String traceEnabled, String queryOptions, String querySyntax) {
@@ -325,10 +335,10 @@ public class PinotQueryResource {
}
}
- public String sendRequestRaw(String url, String query, ObjectNode requestJson) {
+ public String sendRequestRaw(String url, String query, ObjectNode requestJson, Map<String, String> headers) {
try {
final long startTime = System.currentTimeMillis();
- final String pinotResultString = sendPostRaw(url, requestJson.toString(), null);
+ final String pinotResultString = sendPostRaw(url, requestJson.toString(), headers);
final long queryTime = System.currentTimeMillis() - startTime;
LOGGER.info("Query: " + query + " Time: " + queryTime);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org