You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2021/02/25 19:47:19 UTC
[incubator-pinot] 02/04: tokens everywhere
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 0f47b6097ba4ece74038eaa6436beda157fbb524
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Wed Feb 10 18:45:36 2021 -0800
tokens everywhere
---
.../broker/BasicAuthAccessControlFactory.java | 95 +++--------------
.../segment/generation/SegmentGenerationUtils.java | 43 ++++++--
.../common/utils/FileUploadDownloadClient.java | 54 +++++++++-
.../common/utils/fetcher/BaseSegmentFetcher.java | 4 +-
.../common/utils/fetcher/HttpSegmentFetcher.java | 4 +-
.../utils/fetcher/SegmentFetcherFactory.java | 69 +++++++++----
.../api/access/BasicAuthAccessControlFactory.java | 112 ++++++---------------
.../apache/pinot/core/auth/BasicAuthPrincipal.java | 37 +++++++
.../org/apache/pinot/core/auth/BasicAuthUtils.java | 100 ++++++++++++++++++
.../SegmentGenerationAndPushTaskExecutor.java | 7 +-
.../ingestion/batch/common/SegmentPushUtils.java | 5 +-
.../batch/hadoop/HadoopSegmentCreationMapper.java | 14 +--
.../spark/SparkSegmentGenerationJobRunner.java | 23 +++--
.../standalone/SegmentGenerationJobRunner.java | 8 +-
.../starter/helix/HelixInstanceDataManager.java | 1 +
.../spi/ingestion/batch/BatchConfigProperties.java | 1 +
.../org/apache/pinot/tools/BootstrapTableTool.java | 4 +-
.../admin/command/AbstractBaseAdminCommand.java | 41 ++++++--
.../tools/admin/command/AddSchemaCommand.java | 25 +++--
.../pinot/tools/admin/command/AddTableCommand.java | 21 ++--
.../tools/admin/command/AddTenantCommand.java | 10 +-
.../tools/admin/command/BootstrapTableCommand.java | 7 +-
.../tools/admin/command/ChangeTableState.java | 10 +-
.../tools/admin/command/ImportDataCommand.java | 9 +-
.../admin/command/OperateClusterConfigCommand.java | 30 ++++--
.../tools/admin/command/PostQueryCommand.java | 16 ++-
.../tools/admin/command/QuickstartRunner.java | 4 +-
.../tools/admin/command/UploadSegmentCommand.java | 10 +-
.../apache/pinot/tools/utils/PinotConfigUtils.java | 3 +
29 files changed, 499 insertions(+), 268 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
index 3a670b3..0d00642 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
@@ -19,16 +19,18 @@
package org.apache.pinot.broker.broker;
import com.google.common.base.Preconditions;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
/**
@@ -43,10 +45,7 @@ import java.util.stream.Collectors;
* </pre>
*/
public class BasicAuthAccessControlFactory extends AccessControlFactory {
- private static final String PRINCIPALS = "principals";
- private static final String PASSWORD = "password";
- private static final String TABLES = "tables";
- private static final String TABLES_ALL = "*";
+ private static final String PREFIX = "principals";
private static final String HEADER_AUTHORIZATION = "authorization";
@@ -57,26 +56,7 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
}
public void init(PinotConfiguration configuration) {
- String principalNames = configuration.getProperty(PRINCIPALS);
- Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
- List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
- String name = rawName.trim();
- Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
- String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD));
- Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
- Set<String> tables = new HashSet<>();
- String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES));
- if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) {
- tables.addAll(Arrays.asList(tableNames.split(",")));
- }
-
- return new BasicAuthPrincipal(name, toToken(name, password), tables);
- }).collect(Collectors.toList());
-
- _accessControl = new BasicAuthAccessControl(principals);
+ _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
}
public AccessControl create() {
@@ -100,8 +80,8 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
Optional<BasicAuthPrincipal> principalOpt =
- tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken).map(_principals::get)
- .filter(Objects::nonNull).findFirst();
+ tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get).filter(Objects::nonNull)
+ .findFirst();
if (!principalOpt.isPresent()) {
// no matching token? reject
@@ -109,61 +89,12 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
}
BasicAuthPrincipal principal = principalOpt.get();
- if (principal.getTables().isEmpty() || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource()
- .isSetTableName()) {
+ if (!brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource().isSetTableName()) {
// no table restrictions? accept
return true;
}
- return principal.getTables().contains(brokerRequest.getQuerySource().getTableName());
- }
- }
-
- /**
- * Container object for basic auth principal
- */
- private static class BasicAuthPrincipal {
- private final String _name;
- private final String _token;
- private final Set<String> _tables;
-
- public BasicAuthPrincipal(String name, String token, Set<String> tables) {
- _name = name;
- _token = token;
- _tables = tables;
- }
-
- public String getName() {
- return _name;
- }
-
- public Set<String> getTables() {
- return _tables;
- }
-
- public String getToken() {
- return _token;
- }
- }
-
- private static String toToken(String name, String password) {
- String identifier = String.format("%s:%s", name, password);
- return normalizeToken(
- String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes(StandardCharsets.UTF_8))));
- }
-
- /**
- * Implementations of base64 encoding vary and may generate different numbers of padding characters "=". We normalize
- * these by removing any padding.
- *
- * @param token raw token
- * @return normalized token
- */
- @Nullable
- private static String normalizeToken(String token) {
- if (token == null) {
- return null;
+ return principal.hasTable(brokerRequest.getQuerySource().getTableName());
}
- return StringUtils.remove(token.trim(), '=');
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
index 3c0c06e..588d9fd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
@@ -26,8 +26,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -50,6 +53,10 @@ public class SegmentGenerationUtils {
}
public static Schema getSchema(String schemaURIString) {
+ return getSchema(schemaURIString, null);
+ }
+
+ public static Schema getSchema(String schemaURIString, String authToken) {
URI schemaURI;
try {
schemaURI = new URI(schemaURIString);
@@ -75,7 +82,7 @@ public class SegmentGenerationUtils {
} else {
// Try to directly read from URI.
try {
- schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
+ schemaJson = fetchUrl(schemaURI.toURL(), authToken);
} catch (IOException e) {
throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e);
}
@@ -88,6 +95,10 @@ public class SegmentGenerationUtils {
}
public static TableConfig getTableConfig(String tableConfigURIStr) {
+ return getTableConfig(tableConfigURIStr, null);
+ }
+
+ public static TableConfig getTableConfig(String tableConfigURIStr, String authToken) {
URI tableConfigURI;
try {
tableConfigURI = new URI(tableConfigURIStr);
@@ -106,7 +117,7 @@ public class SegmentGenerationUtils {
}
} else {
try {
- tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8);
+ tableConfigJson = fetchUrl(tableConfigURI.toURL(), authToken);
} catch (IOException e) {
throw new RuntimeException(
"Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e);
@@ -142,7 +153,8 @@ public class SegmentGenerationUtils {
public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
URI relativePath = baseInputDir.relativize(inputFile);
Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
- "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: " + baseInputDir);
+ "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: "
+ + baseInputDir);
String outputDirStr = outputDir.toString();
outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
@@ -176,10 +188,11 @@ public class SegmentGenerationUtils {
throws URISyntaxException {
URI fileURI = URI.create(uriStr);
if (fileURI.getScheme() == null) {
- return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(), fullUriForPathOnlyUriStr.getHost(),
- fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment());
+ return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(),
+ fullUriForPathOnlyUriStr.getHost(), fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(),
+ fileURI.getFragment());
}
-
+
return fileURI;
}
@@ -198,4 +211,22 @@ public class SegmentGenerationUtils {
}
return uri;
}
+
+ /**
+ * Retrieve a URL via GET request, with an optional authorization token.
+ *
+ * @param url target url
+ * @param authToken optional auth token, or null
+ * @return fetched document
+ * @throws IOException on connection problems
+ */
+ private static String fetchUrl(URL url, String authToken)
+ throws IOException {
+ URLConnection connection = url.openConnection();
+
+ if (StringUtils.isNotBlank(authToken)) {
+ connection.setRequestProperty("Authorization", authToken);
+ }
+ return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8);
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4b12e2a..401b036 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -58,6 +58,7 @@ import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.entity.mime.content.InputStreamBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
@@ -340,7 +341,14 @@ public class FileUploadDownloadClient implements Closeable {
}
private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) {
+ return getDownloadFileRequest(uri, socketTimeoutMs, null);
+ }
+
+ private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+ if (StringUtils.isNotBlank(authToken)) {
+ requestBuilder.addHeader("Authorization", authToken);
+ }
setTimeout(requestBuilder, socketTimeoutMs);
String userInfo = uri.getUserInfo();
if (userInfo != null) {
@@ -687,7 +695,23 @@ public class FileUploadDownloadClient implements Closeable {
*/
public int downloadFile(URI uri, int socketTimeoutMs, File dest)
throws IOException, HttpErrorStatusException {
- HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs);
+ return downloadFile(uri, socketTimeoutMs, dest, null);
+ }
+
+ /**
+ * Download a file using default settings, with an optional auth token
+ *
+ * @param uri URI
+ * @param socketTimeoutMs Socket timeout in milliseconds
+ * @param dest File destination
+ * @param authToken optional auth token, or null
+ * @return Response status code
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken)
+ throws IOException, HttpErrorStatusException {
+ HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken);
try (CloseableHttpResponse response = _httpClient.execute(request)) {
StatusLine statusLine = response.getStatusLine();
int statusCode = statusLine.getStatusCode();
@@ -728,6 +752,21 @@ public class FileUploadDownloadClient implements Closeable {
return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest);
}
+ /**
+ * Download a file, with an optional auth token.
+ *
+ * @param uri URI
+ * @param dest File destination
+ * @param authToken optional auth token, or null
+ * @return Response status code
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public int downloadFile(URI uri, File dest, String authToken)
+ throws IOException, HttpErrorStatusException {
+ return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken);
+ }
+
@Override
public void close()
throws IOException {
@@ -742,4 +781,17 @@ public class FileUploadDownloadClient implements Closeable {
public static void installDefaultSSLContext(SSLContext sslContext) {
_defaultSSLContext = sslContext;
}
+
+ /**
+ * Generate an (optional) HTTP Authorization header given an auth token
+ *
+ * @param authToken auth token
+ * @return list of 0 or 1 "Authorization" headers
+ */
+ public static List<Header> makeAuthHeader(String authToken) {
+ if (org.apache.commons.lang3.StringUtils.isBlank(authToken)) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(new BasicHeader("Authorization", authToken));
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index e4a00c9..1988edb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Random;
-
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
@@ -36,6 +35,7 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
public static final String RETRY_COUNT_CONFIG_KEY = "retry.count";
public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms";
public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor";
+ public static final String AUTH_TOKEN = "auth.token";
public static final int DEFAULT_RETRY_COUNT = 3;
public static final int DEFAULT_RETRY_WAIT_MS = 100;
public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
@@ -45,12 +45,14 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
protected int _retryCount;
protected int _retryWaitMs;
protected int _retryDelayScaleFactor;
+ protected String _authToken;
@Override
public void init(PinotConfiguration config) {
_retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
_retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
_retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
+ _authToken = config.getProperty(AUTH_TOKEN);
doInit(config);
_logger
.info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index fcaee6c..6d6d25e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -41,7 +41,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
throws Exception {
RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
try {
- int statusCode = _httpClient.downloadFile(uri, dest);
+ int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
_logger
.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
statusCode);
@@ -70,7 +70,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
throws Exception {
try {
- int statusCode = _httpClient.downloadFile(uri, dest);
+ int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
_logger
.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
statusCode);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 05b14d8..83d98e4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -34,22 +34,25 @@ import org.slf4j.LoggerFactory;
public class SegmentFetcherFactory {
- private SegmentFetcherFactory() {
- }
+ private final static SegmentFetcherFactory instance = new SegmentFetcherFactory();
static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
private static final String PROTOCOLS_KEY = "protocols";
+ private static final String AUTH_TOKEN_KEY = "auth.token";
private static final String ENCODED_SUFFIX = ".enc";
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class);
- private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new HashMap<>();
- private static final SegmentFetcher DEFAULT_HTTP_SEGMENT_FETCHER = new HttpSegmentFetcher();
- private static final SegmentFetcher DEFAULT_PINOT_FS_SEGMENT_FETCHER = new PinotFSSegmentFetcher();
-
- static {
- PinotConfiguration emptyConfig = new PinotConfiguration();
- DEFAULT_HTTP_SEGMENT_FETCHER.init(emptyConfig);
- DEFAULT_PINOT_FS_SEGMENT_FETCHER.init(emptyConfig);
+
+ private final Map<String, SegmentFetcher> _segmentFetcherMap = new HashMap<>();
+ private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher();
+ private final SegmentFetcher _pinotFSSegmentFetcher = new PinotFSSegmentFetcher();
+
+ private SegmentFetcherFactory() {
+ // left blank
+ }
+
+ public static SegmentFetcherFactory getInstance() {
+ return instance;
}
/**
@@ -57,6 +60,14 @@ public class SegmentFetcherFactory {
*/
public static void init(PinotConfiguration config)
throws Exception {
+ getInstance().initInternal(config);
+ }
+
+ private void initInternal(PinotConfiguration config)
+ throws Exception {
+ _httpSegmentFetcher.init(config); // directly, without sub-namespace
+ _pinotFSSegmentFetcher.init(config); // directly, without sub-namespace
+
List<String> protocols = config.getProperty(PROTOCOLS_KEY, Arrays.asList());
for (String protocol : protocols) {
String segmentFetcherClassName = config.getProperty(protocol + SEGMENT_FETCHER_CLASS_KEY_SUFFIX);
@@ -77,8 +88,16 @@ public class SegmentFetcherFactory {
LOGGER.info("Creating segment fetcher for protocol: {} with class: {}", protocol, segmentFetcherClassName);
segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).newInstance();
}
- segmentFetcher.init(config.subset(protocol));
- SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher);
+
+ String authToken = config.getProperty(AUTH_TOKEN_KEY);
+ Map<String, Object> subConfigMap = config.subset(protocol).toMap();
+ if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) && StringUtils.isNotBlank(authToken)) {
+ subConfigMap.put(AUTH_TOKEN_KEY, authToken);
+ }
+
+ segmentFetcher.init(new PinotConfiguration(subConfigMap));
+
+ _segmentFetcherMap.put(protocol, segmentFetcher);
}
}
@@ -87,7 +106,11 @@ public class SegmentFetcherFactory {
* ({@link HttpSegmentFetcher} for "http" and "https", {@link PinotFSSegmentFetcher} for other protocols).
*/
public static SegmentFetcher getSegmentFetcher(String protocol) {
- SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol);
+ return getInstance().getSegmentFetcherInternal(protocol);
+ }
+
+ private SegmentFetcher getSegmentFetcherInternal(String protocol) {
+ SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol);
if (segmentFetcher != null) {
return segmentFetcher;
} else {
@@ -95,9 +118,9 @@ public class SegmentFetcherFactory {
switch (protocol) {
case CommonConstants.HTTP_PROTOCOL:
case CommonConstants.HTTPS_PROTOCOL:
- return DEFAULT_HTTP_SEGMENT_FETCHER;
+ return _httpSegmentFetcher;
default:
- return DEFAULT_PINOT_FS_SEGMENT_FETCHER;
+ return _pinotFSSegmentFetcher;
}
}
}
@@ -107,7 +130,7 @@ public class SegmentFetcherFactory {
*/
public static void fetchSegmentToLocal(URI uri, File dest)
throws Exception {
- getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
+ getInstance().fetchSegmentToLocalInternal(uri, dest);
}
/**
@@ -115,7 +138,12 @@ public class SegmentFetcherFactory {
*/
public static void fetchSegmentToLocal(String uri, File dest)
throws Exception {
- fetchSegmentToLocal(new URI(uri), dest);
+ getInstance().fetchSegmentToLocalInternal(new URI(uri), dest);
+ }
+
+ private void fetchSegmentToLocalInternal(URI uri, File dest)
+ throws Exception {
+ getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
}
/**
@@ -125,6 +153,11 @@ public class SegmentFetcherFactory {
*/
public static void fetchAndDecryptSegmentToLocal(String uri, File dest, String crypterName)
throws Exception {
+ getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, crypterName);
+ }
+
+ private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String crypterName)
+ throws Exception {
if (crypterName == null) {
fetchSegmentToLocal(uri, dest);
} else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
index 0b2c00a..d8cf80b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
@@ -18,13 +18,16 @@
*/
package org.apache.pinot.controller.api.access;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
/**
@@ -41,43 +44,13 @@ import java.util.stream.Collectors;
*/
public class BasicAuthAccessControlFactory implements AccessControlFactory {
private static final String PREFIX = "controller.admin.access.control.principals";
- private static final String PASSWORD = "password";
- private static final String PERMISSIONS = "permissions";
- private static final String TABLES = "tables";
- private static final String ALL = "*";
private static final String HEADER_AUTHORIZATION = "Authorization";
private AccessControl _accessControl;
public void init(PinotConfiguration configuration) {
- String principalNames = configuration.getProperty(PREFIX);
- Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
- List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
- String name = rawName.trim();
- Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
- String password = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PASSWORD));
- Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
- Set<String> tables = new HashSet<>();
- String tableNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, TABLES));
- if (StringUtils.isNotBlank(tableNames) && !ALL.equals(tableNames)) {
- tables = Arrays.stream(tableNames.split(",")).map(String::trim).collect(Collectors.toSet());
- }
-
- Set<AccessType> permissions = new HashSet<>();
- String permissionNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PERMISSIONS));
- if (StringUtils.isNotBlank(permissionNames) && !ALL.equals(tableNames)) {
- permissions = Arrays.stream(permissionNames.split(",")).map(String::trim).map(String::toUpperCase)
- .map(AccessType::valueOf).collect(Collectors.toSet());
- }
-
- return new BasicAuthPrincipal(name, toToken(name, password), tables, permissions);
- }).collect(Collectors.toList());
-
- _accessControl = new BasicAuthAccessControl(principals);
+ _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
}
@Override
@@ -97,67 +70,40 @@ public class BasicAuthAccessControlFactory implements AccessControlFactory {
@Override
public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) {
+ Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+ boolean response = getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
}
@Override
public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
- return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(accessType)).isPresent();
+ Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+ boolean response =
+ getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType)))
+ .isPresent();
+ return getPrincipal(httpHeaders)
+ .filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType))).isPresent();
}
@Override
public boolean hasAccess(AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
+ Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+ boolean response = getPrincipal(httpHeaders).isPresent();
return getPrincipal(httpHeaders).isPresent();
}
private Optional<BasicAuthPrincipal> getPrincipal(HttpHeaders headers) {
- return headers.getRequestHeader(HEADER_AUTHORIZATION).stream().map(BasicAuthAccessControlFactory::normalizeToken)
- .map(_principals::get).filter(Objects::nonNull).findFirst();
- }
- }
-
- /**
- * Container object for basic auth principal
- */
- private static class BasicAuthPrincipal {
- private final String _name;
- private final String _token;
- private final Set<String> _tables;
- private final Set<AccessType> _permissions;
-
- public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<AccessType> permissions) {
- this._name = name;
- this._token = token;
- this._tables = tables;
- this._permissions = permissions;
- }
-
- public String getName() {
- return _name;
- }
-
- public String getToken() {
- return _token;
- }
-
- public boolean hasTable(String tableName) {
- return _tables.isEmpty() || _tables.contains(tableName);
- }
-
- public boolean hasPermission(AccessType accessType) {
- return _permissions.isEmpty() || _permissions.contains(accessType);
- }
- }
+ if (headers == null) {
+ return Optional.empty();
+ }
- private static String toToken(String name, String password) {
- String identifier = String.format("%s:%s", name, password);
- return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
- }
+ List<String> authHeaders = headers.getRequestHeader(HEADER_AUTHORIZATION);
+ if (authHeaders == null) {
+ return Optional.empty();
+ }
- private static String normalizeToken(String token) {
- if (token == null) {
- return null;
+ return authHeaders.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get)
+ .filter(Objects::nonNull).findFirst();
}
- return token.trim().replace("=", "");
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
new file mode 100644
index 0000000..ae44401
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
@@ -0,0 +1,37 @@
+package org.apache.pinot.core.auth;
+
+import java.util.Set;
+
+
+/**
+ * Container object for basic auth principal
+ */
+public class BasicAuthPrincipal {
+ private final String _name;
+ private final String _token;
+ private final Set<String> _tables;
+ private final Set<String> _permissions;
+
+ public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<String> permissions) {
+ this._name = name;
+ this._token = token;
+ this._tables = tables;
+ this._permissions = permissions;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public String getToken() {
+ return _token;
+ }
+
+ public boolean hasTable(String tableName) {
+ return _tables.isEmpty() || _tables.contains(tableName);
+ }
+
+ public boolean hasPermission(String permission) {
+ return _permissions.isEmpty() || _permissions.contains(permission);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
new file mode 100644
index 0000000..46cfa6b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
@@ -0,0 +1,100 @@
+package org.apache.pinot.core.auth;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * Utility for configuring basic auth and parsing related http tokens
+ */
+public final class BasicAuthUtils {
+ private static final String PASSWORD = "password";
+ private static final String PERMISSIONS = "permissions";
+ private static final String TABLES = "tables";
+ private static final String ALL = "*";
+
+ private BasicAuthUtils() {
+ // left blank
+ }
+
+ /**
+ * Parse a pinot configuration namespace for access control settings, e.g. "controller.admin.access.control.principals".
+ *
+ * <pre>
+ * Example:
+ * my.prefix.access.control.principals=admin123,user456
+ * my.prefix.access.control.principals.admin123.password=verysecret
+ * my.prefix.access.control.principals.user456.password=kindasecret
+ * my.prefix.access.control.principals.user456.tables=stuff,lessImportantStuff
+ * my.prefix.access.control.principals.user456.permissions=read,update
+ * </pre>
+ *
+ * @param configuration pinot configuration
+ * @param prefix configuration namespace
+ * @return list of BasicAuthPrincipals
+ */
+ public static List<BasicAuthPrincipal> extractBasicAuthPrincipals(PinotConfiguration configuration, String prefix) {
+ String principalNames = configuration.getProperty(prefix);
+ Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+ return Arrays.stream(principalNames.split(",")).map(rawName -> {
+ String name = rawName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+ String password = configuration.getProperty(String.format("%s.%s.%s", prefix, name, PASSWORD));
+ Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+ Set<String> tables = extractSet(configuration, String.format("%s.%s.%s", prefix, name, TABLES));
+ Set<String> permissions = extractSet(configuration, String.format("%s.%s.%s", prefix, name, PERMISSIONS));
+
+ return new BasicAuthPrincipal(name, toBasicAuthToken(name, password), tables, permissions);
+ }).collect(Collectors.toList());
+ }
+
+ private static Set<String> extractSet(PinotConfiguration configuration, String key) {
+ String input = configuration.getProperty(key);
+ if (StringUtils.isNotBlank(input) && !ALL.equals(input)) {
+ return Arrays.stream(input.split(",")).map(String::trim).collect(Collectors.toSet());
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * Convert a pair of name and password into a http header-compliant base64 encoded token
+ *
+ * @param name user name
+ * @param password password
+ * @return base64 encoded basic auth token
+ */
+ @Nullable
+ public static String toBasicAuthToken(String name, String password) {
+ if (StringUtils.isBlank(name)) {
+ return null;
+ }
+ String identifier = String.format("%s:%s", name, password);
+ return normalizeBase64Token(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+ }
+
+ /**
+ * Normalize a base64 encoded auth token by stripping redundant padding (spaces, '=')
+ *
+ * @param token base64 encoded auth token
+ * @return normalized auth token
+ */
+ @Nullable
+ public static String normalizeBase64Token(String token) {
+ if (token == null) {
+ return null;
+ }
+ return StringUtils.remove(token.trim(), '=');
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
index e490de2..ff4f66f 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -284,13 +284,15 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
taskSpec.setRecordReaderSpec(recordReaderSpec);
+ String authToken = taskConfigs.get(BatchConfigProperties.AUTH_TOKEN); // TODO
+
String tableNameWithType = taskConfigs.get(BatchConfigProperties.TABLE_NAME);
Schema schema;
if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
schema = JsonUtils
.stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
} else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
- schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+ schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI), authToken);
} else {
schema = getSchema(tableNameWithType);
}
@@ -299,7 +301,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) {
tableConfig = JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class);
} else if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) {
- tableConfig = SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI));
+ tableConfig =
+ SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI), authToken);
} else {
tableConfig = getTableConfig(tableNameWithType);
}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 4c3b41b..d15cfcd 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -119,7 +120,9 @@ public class SegmentPushUtils implements Serializable {
try (InputStream inputStream = fileSystem.open(tarFileURI)) {
SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream,
- tableName);
+ FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList(
+ new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)),
+ FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
controllerURI, response.getStatusCode(), response.getResponse());
return true;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index 8b71584..c9622ba 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -67,7 +67,8 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
private File _localTempDir;
@Override
- public void setup(Context context) throws IOException {
+ public void setup(Context context)
+ throws IOException {
_jobConf = context.getConfiguration();
Yaml yaml = new Yaml();
String segmentGenerationJobSpecStr = _jobConf.get(SEGMENT_GENERATION_JOB_SPEC);
@@ -96,7 +97,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
} else {
LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", localPluginsTarFile.getAbsolutePath());
}
-
+
// Register file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
@@ -146,8 +147,9 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
- taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
- taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+ taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+ taskSpec.setTableConfig(
+ SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
@@ -186,7 +188,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
.resolve(segmentTarFileName);
LOGGER.info("Copying segment tar file from [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-
+
FileUtils.deleteQuietly(localSegmentDir);
FileUtils.deleteQuietly(localSegmentTarFile);
FileUtils.deleteQuietly(localInputDataFile);
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..d2273e7 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,13 +18,6 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;
-import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
-
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -40,11 +33,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -65,6 +57,13 @@ import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
+import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
+
public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {
@@ -302,8 +301,10 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
- taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
- taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+ taskSpec
+ .setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+ taskSpec.setTableConfig(
+ SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..52cf3de 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -27,14 +27,13 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -165,8 +164,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
FileUtils.forceMkdir(localOutputTempDir);
//Read TableConfig, Schema
- Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
- TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
+ Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken());
+ TableConfig tableConfig =
+ SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken());
int numInputFiles = filteredFiles.size();
CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index deb3a6a..162ba71 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -70,6 +70,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
private HelixManager _helixManager;
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private String _authToken;
@Override
public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5a28884..0e3f1c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -49,6 +49,7 @@ public class BatchConfigProperties {
public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+ public static final String AUTH_TOKEN = "authToken";
public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 2e86199..35efe27 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -118,7 +118,7 @@ public class BootstrapTableTool {
return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
.setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol)
.setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true)
- .execute();
+ .setAuthToken(_token).execute();
}
private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
@@ -178,6 +178,8 @@ public class BootstrapTableTool {
tlsSpec.getTrustStorePath(), tlsSpec.getTrustStorePassword());
}
+ spec.setAuthToken(_token);
+
IngestionJobLauncher.runIngestionJob(spec);
}
} else {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
index 30d4c28..1debd42 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
@@ -31,10 +31,13 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
+import javax.annotation.Nullable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.auth.BasicAuthUtils;
import org.apache.pinot.tools.AbstractBaseCommand;
import org.apache.pinot.tools.utils.PinotConfigUtils;
@@ -83,10 +86,9 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
final URL url = new URL(urlString);
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setDoOutput(true);
+ headers.forEach(header -> conn.setRequestProperty(header.getName(), header.getValue()));
conn.setRequestMethod(requestMethod);
- headers.stream().flatMap(header -> Arrays.stream(header.getElements()))
- .forEach(elem -> conn.setRequestProperty(elem.getName(), elem.getValue()));
+ conn.setDoOutput(true);
if (payload != null) {
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),
StandardCharsets.UTF_8));
@@ -123,16 +125,35 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
return PinotConfigUtils.readConfigFromFile(configFileName);
}
- static List<Header> makeBasicAuth(String user, String password) {
- if (StringUtils.isBlank(user)) {
- return Collections.emptyList();
+ /**
+ * Generate an (optional) HTTP Authorization header given an auth token
+ * @see FileUploadDownloadClient#makeAuthHeader(String)
+ *
+ * @param authToken auth token
+ * @return list of 0 or 1 "Authorization" headers
+ */
+ static List<Header> makeAuthHeader(String authToken) {
+ return FileUploadDownloadClient.makeAuthHeader(authToken);
+ }
+
+ /**
+ * Generate auth token from pass-thru token or generate basic auth from user/password pair
+ *
+ * @param authToken optional pass-thru token
+ * @param user optional username
+ * @param password optional password
+ * @return auth token, or null if neither pass-thru token nor user info available
+ */
+ @Nullable
+ static String makeAuthToken(String authToken, String user, String password) {
+ if (StringUtils.isNotBlank(authToken)) {
+ return authToken;
}
- if (StringUtils.isBlank(password)) {
- password = "";
+ if (StringUtils.isNotBlank(user)) {
+ return BasicAuthUtils.toBasicAuthToken(user, password);
}
- String token = "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
- return Collections.singletonList(new BasicHeader("Authorization", token));
+ return null;
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index 2bb0a99..fdf7a06 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -60,6 +60,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -80,10 +83,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
@Override
public String toString() {
- String retString =
- ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
- + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
- + "[hidden]");
+ String retString = ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+ + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
+ + "[hidden]");
return ((_exec) ? (retString + " -exec") : retString);
}
@@ -114,11 +116,15 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
}
public void setUser(String user) {
- this._user = user;
+ _user = user;
}
public void setPassword(String password) {
- this._password = password;
+ _password = password;
+ }
+
+ public void setAuthToken(String authToken) {
+ _authToken = authToken;
}
public AddSchemaCommand setExecute(boolean exec) {
@@ -147,9 +153,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
Schema schema = Schema.fromFile(schemaFile);
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- fileUploadDownloadClient.addSchema(
- FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
- schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+ fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+ .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+ schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+ Collections.emptyList());
} catch (Exception e) {
LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
return false;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index 78123e5..e8403fa 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
-
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.NetUtil;
@@ -66,6 +65,9 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -91,7 +93,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
String retString =
("AddTable -tableConfigFile " + _tableConfigFile + " -schemaFile " + _schemaFile + " -controllerProtocol "
+ _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort
- + " -user " + _user + " -password " + "[hidden]");
+ + " -user " + _user + " -password " + "[hidden]");
return ((_exec) ? (retString + " -exec") : retString);
}
@@ -134,6 +136,11 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
return this;
}
+ public AddTableCommand setAuthToken(String authToken) {
+ _authToken = authToken;
+ return this;
+ }
+
public AddTableCommand setExecute(boolean exec) {
_exec = exec;
return this;
@@ -151,9 +158,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
throw e;
}
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- fileUploadDownloadClient.addSchema(
- FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
- schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+ fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+ .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+ schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+ Collections.emptyList());
} catch (Exception e) {
LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
throw e;
@@ -163,7 +171,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
public boolean sendTableCreationRequest(JsonNode node)
throws IOException {
String res = AbstractBaseAdminCommand
- .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString());
+ .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString(),
+ makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
LOGGER.info(res);
return res.contains("succesfully added");
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
index 3365314..efd4bf9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
@@ -67,6 +67,9 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -112,6 +115,11 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
return this;
}
+ public AddTenantCommand setAuthToken(String authToken) {
+ _authToken = authToken;
+ return this;
+ }
+
public AddTenantCommand setExecute(boolean exec) {
_exec = exec;
return this;
@@ -137,7 +145,7 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount);
String res = AbstractBaseAdminCommand
.sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
- tenant.toJsonString(), makeBasicAuth(_user, _password));
+ tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
LOGGER.info(res);
System.out.print(res);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
index 6f7d017..12806c5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.tools.admin.command;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.core.auth.BasicAuthUtils;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.tools.Command;
import org.apache.pinot.tools.BootstrapTableTool;
@@ -82,6 +84,9 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -122,7 +127,7 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
if (_controllerHost == null) {
_controllerHost = NetUtil.getHostAddress();
}
- String token = ""; // TODO
+ String token = makeAuthToken(_authToken, _user, _password);
return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token).execute();
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
index d240e77..83ad5f8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
@@ -23,6 +23,7 @@ import java.net.URL;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpURL;
import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
@@ -56,6 +57,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -75,8 +79,12 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
URI uri = new URI(_controllerProtocol, null, _controllerHost, Integer.parseInt(_controllerPort),
URI_TABLES_PATH + _tableName, "state=" + stateValue, null);
+ String token = makeAuthToken(_authToken, _user, _password);
+
GetMethod httpGet = new GetMethod(uri.toString());
- httpGet.setRequestHeader("Authorization", null); // TODO
+ if (StringUtils.isNotBlank(token)) {
+ httpGet.setRequestHeader("Authorization", token);
+ }
int status = httpClient.executeMethod(httpGet);
if (status != 200) {
throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
index 49884e4..c432146 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -81,6 +81,9 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
@@ -243,11 +246,7 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
spec.setCleanUpOutputDir(true);
spec.setOverwriteOutput(true);
spec.setJobType("SegmentCreationAndTarPush");
-
- if (!StringUtils.isBlank(_user)) {
- String token = ""; // TODO
- spec.setAuthToken(token);
- }
+ spec.setAuthToken(makeAuthToken(_authToken, _user, _password));
// set ExecutionFrameworkSpec
ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
index 97b04c6..27d3520 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
@@ -19,12 +19,11 @@
package org.apache.pinot.tools.admin.command;
import com.fasterxml.jackson.databind.JsonNode;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
-import org.apache.commons.io.IOUtils;
+import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.http.Header;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -52,6 +51,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-config", metaVar = "<string>", usage = "Cluster config to operate.")
private String _config;
@@ -73,8 +75,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
@Override
public String toString() {
- String toString = "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost "
- + _controllerHost + " -controllerPort " + _controllerPort + " -operation " + _operation;
+ String toString =
+ "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+ + " -controllerPort " + _controllerPort + " -operation " + _operation;
if (_config != null) {
toString += " -config " + _config;
}
@@ -116,6 +119,11 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
return this;
}
+ public OperateClusterConfigCommand setAuthToken(String authToken) {
+ _authToken = authToken;
+ return this;
+ }
+
public OperateClusterConfigCommand setConfig(String config) {
_config = config;
return this;
@@ -135,7 +143,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
if (StringUtils.isEmpty(_config) && !_operation.equalsIgnoreCase("GET")) {
throw new UnsupportedOperationException("Empty config: " + _config);
}
- String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+ String clusterConfigUrl =
+ _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+ List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user, _password));
switch (_operation.toUpperCase()) {
case "ADD":
case "UPDATE":
@@ -145,10 +155,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
"Bad config: " + _config + ". Please follow the pattern of [Config Key]=[Config Value]");
}
String request = JsonUtils.objectToString(Collections.singletonMap(splits[0], splits[1]));
- return sendRequest("POST", clusterConfigUrl, request, makeBasicAuth(_user, _password));
+ return sendRequest("POST", clusterConfigUrl, request, headers);
case "GET":
- // TODO
- String response = IOUtils.toString(new URI(clusterConfigUrl), StandardCharsets.UTF_8);
+ String response = sendRequest("GET", clusterConfigUrl, null, headers);
JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
Iterator<String> fieldNamesIterator = jsonNode.fieldNames();
String results = "";
@@ -159,8 +168,7 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
}
return results;
case "DELETE":
- return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null,
- makeBasicAuth(_user, _password));
+ return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null, headers);
default:
throw new UnsupportedOperationException("Unsupported operation: " + _operation);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index 4afb686..72c16e4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -21,8 +21,8 @@ package org.apache.pinot.tools.admin.command;
import java.util.Collections;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Command;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -53,6 +53,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -68,8 +71,8 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
@Override
public String toString() {
- return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort " +
- _brokerPort + " -queryType " + _queryType + " -query " + _query);
+ return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort "
+ + _brokerPort + " -queryType " + _queryType + " -query " + _query);
}
@Override
@@ -107,6 +110,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
return this;
}
+ public PostQueryCommand setauthToken(String authToken) {
+ _authToken = authToken;
+ return this;
+ }
+
public PostQueryCommand setQueryType(String queryType) {
_queryType = queryType;
return this;
@@ -133,7 +141,7 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
request = JsonUtils.objectToString(Collections.singletonMap(Request.PQL, _query));
}
- return sendRequest("POST", urlString, request, makeBasicAuth(_user, _password));
+ return sendRequest("POST", urlString, request, makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
}
@Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 3630602..db53c31 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.auth.BasicAuthUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -202,9 +203,10 @@ public class QuickstartRunner {
public void bootstrapTable()
throws Exception {
+ String token = BasicAuthUtils.toBasicAuthToken("admin", "verysecret");
for (QuickstartTableRequest request : _tableRequests) {
if (!new BootstrapTableTool("http", InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0),
- request.getBootstrapTableDir(), null).execute()) {
+ request.getBootstrapTableDir(), token).execute()) {
throw new RuntimeException("Failed to bootstrap table with request - " + request);
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index d393e57..679a2a6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -58,6 +58,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
@Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
private String _password;
+ @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+ private String _authToken;
+
@Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.")
private String _segmentDir = null;
@@ -119,6 +122,11 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
return this;
}
+ public UploadSegmentCommand setAuthToken(String authToken) {
+ _authToken = authToken;
+ return this;
+ }
+
public UploadSegmentCommand setSegmentDir(String segmentDir) {
_segmentDir = segmentDir;
return this;
@@ -159,7 +167,7 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
fileUploadDownloadClient
.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile,
- makeBasicAuth(_user, _password), Collections.singletonList(new BasicNameValuePair(
+ makeAuthHeader(makeAuthToken(_authToken, _user, _password)), Collections.singletonList(new BasicNameValuePair(
FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)),
FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 0354fd9..7a17845 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -76,6 +76,7 @@ public class PinotConfigUtils {
properties.put("controller.admin.access.control.principals.user.password", "secret");
properties.put("controller.admin.access.control.principals.user.tables", "baseballStats");
properties.put("controller.admin.access.control.principals.user.permissions", "read");
+ properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
return properties;
}
@@ -176,6 +177,7 @@ public class PinotConfigUtils {
properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort);
properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir);
+ properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
return properties;
}
@@ -188,6 +190,7 @@ public class PinotConfigUtils {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost);
properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort());
+ properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
return properties;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org