You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2021/02/25 19:47:19 UTC

[incubator-pinot] 02/04: tokens everywhere

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0f47b6097ba4ece74038eaa6436beda157fbb524
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Wed Feb 10 18:45:36 2021 -0800

    tokens everywhere
---
 .../broker/BasicAuthAccessControlFactory.java      |  95 +++--------------
 .../segment/generation/SegmentGenerationUtils.java |  43 ++++++--
 .../common/utils/FileUploadDownloadClient.java     |  54 +++++++++-
 .../common/utils/fetcher/BaseSegmentFetcher.java   |   4 +-
 .../common/utils/fetcher/HttpSegmentFetcher.java   |   4 +-
 .../utils/fetcher/SegmentFetcherFactory.java       |  69 +++++++++----
 .../api/access/BasicAuthAccessControlFactory.java  | 112 ++++++---------------
 .../apache/pinot/core/auth/BasicAuthPrincipal.java |  37 +++++++
 .../org/apache/pinot/core/auth/BasicAuthUtils.java | 100 ++++++++++++++++++
 .../SegmentGenerationAndPushTaskExecutor.java      |   7 +-
 .../ingestion/batch/common/SegmentPushUtils.java   |   5 +-
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |  14 +--
 .../spark/SparkSegmentGenerationJobRunner.java     |  23 +++--
 .../standalone/SegmentGenerationJobRunner.java     |   8 +-
 .../starter/helix/HelixInstanceDataManager.java    |   1 +
 .../spi/ingestion/batch/BatchConfigProperties.java |   1 +
 .../org/apache/pinot/tools/BootstrapTableTool.java |   4 +-
 .../admin/command/AbstractBaseAdminCommand.java    |  41 ++++++--
 .../tools/admin/command/AddSchemaCommand.java      |  25 +++--
 .../pinot/tools/admin/command/AddTableCommand.java |  21 ++--
 .../tools/admin/command/AddTenantCommand.java      |  10 +-
 .../tools/admin/command/BootstrapTableCommand.java |   7 +-
 .../tools/admin/command/ChangeTableState.java      |  10 +-
 .../tools/admin/command/ImportDataCommand.java     |   9 +-
 .../admin/command/OperateClusterConfigCommand.java |  30 ++++--
 .../tools/admin/command/PostQueryCommand.java      |  16 ++-
 .../tools/admin/command/QuickstartRunner.java      |   4 +-
 .../tools/admin/command/UploadSegmentCommand.java  |  10 +-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |   3 +
 29 files changed, 499 insertions(+), 268 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
index 3a670b3..0d00642 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
@@ -19,16 +19,18 @@
 package org.apache.pinot.broker.broker;
 
 import com.google.common.base.Preconditions;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.pinot.broker.api.AccessControl;
 import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
 
 
 /**
@@ -43,10 +45,7 @@ import java.util.stream.Collectors;
  * </pre>
  */
 public class BasicAuthAccessControlFactory extends AccessControlFactory {
-  private static final String PRINCIPALS = "principals";
-  private static final String PASSWORD = "password";
-  private static final String TABLES = "tables";
-  private static final String TABLES_ALL = "*";
+  private static final String PREFIX = "principals";
 
   private static final String HEADER_AUTHORIZATION = "authorization";
 
@@ -57,26 +56,7 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
   }
 
   public void init(PinotConfiguration configuration) {
-    String principalNames = configuration.getProperty(PRINCIPALS);
-    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
-    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
-      String name = rawName.trim();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
-      String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD));
-      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
-      Set<String> tables = new HashSet<>();
-      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES));
-      if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) {
-        tables.addAll(Arrays.asList(tableNames.split(",")));
-      }
-
-      return new BasicAuthPrincipal(name, toToken(name, password), tables);
-    }).collect(Collectors.toList());
-
-    _accessControl = new BasicAuthAccessControl(principals);
+    _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
   }
 
   public AccessControl create() {
@@ -100,8 +80,8 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
 
       Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
       Optional<BasicAuthPrincipal> principalOpt =
-          tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken).map(_principals::get)
-              .filter(Objects::nonNull).findFirst();
+          tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get).filter(Objects::nonNull)
+              .findFirst();
 
       if (!principalOpt.isPresent()) {
         // no matching token? reject
@@ -109,61 +89,12 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
       }
 
       BasicAuthPrincipal principal = principalOpt.get();
-      if (principal.getTables().isEmpty() || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource()
-          .isSetTableName()) {
+      if (!brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource().isSetTableName()) {
         // no table restrictions? accept
         return true;
       }
 
-      return principal.getTables().contains(brokerRequest.getQuerySource().getTableName());
-    }
-  }
-
-  /**
-   * Container object for basic auth principal
-   */
-  private static class BasicAuthPrincipal {
-    private final String _name;
-    private final String _token;
-    private final Set<String> _tables;
-
-    public BasicAuthPrincipal(String name, String token, Set<String> tables) {
-      _name = name;
-      _token = token;
-      _tables = tables;
-    }
-
-    public String getName() {
-      return _name;
-    }
-
-    public Set<String> getTables() {
-      return _tables;
-    }
-
-    public String getToken() {
-      return _token;
-    }
-  }
-
-  private static String toToken(String name, String password) {
-    String identifier = String.format("%s:%s", name, password);
-    return normalizeToken(
-        String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes(StandardCharsets.UTF_8))));
-  }
-
-  /**
-   * Implementations of base64 encoding vary and may generate different numbers of padding characters "=". We normalize
-   * these by removing any padding.
-   *
-   * @param token raw token
-   * @return normalized token
-   */
-  @Nullable
-  private static String normalizeToken(String token) {
-    if (token == null) {
-      return null;
+      return principal.hasTable(brokerRequest.getQuerySource().getTableName());
     }
-    return StringUtils.remove(token.trim(), '=');
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
index 3c0c06e..588d9fd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
@@ -26,8 +26,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.nio.charset.StandardCharsets;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -50,6 +53,10 @@ public class SegmentGenerationUtils {
   }
 
   public static Schema getSchema(String schemaURIString) {
+    return getSchema(schemaURIString, null);
+  }
+
+  public static Schema getSchema(String schemaURIString, String authToken) {
     URI schemaURI;
     try {
       schemaURI = new URI(schemaURIString);
@@ -75,7 +82,7 @@ public class SegmentGenerationUtils {
     } else {
       // Try to directly read from URI.
       try {
-        schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
+        schemaJson = fetchUrl(schemaURI.toURL(), authToken);
       } catch (IOException e) {
         throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e);
       }
@@ -88,6 +95,10 @@ public class SegmentGenerationUtils {
   }
 
   public static TableConfig getTableConfig(String tableConfigURIStr) {
+    return getTableConfig(tableConfigURIStr, null);
+  }
+
+  public static TableConfig getTableConfig(String tableConfigURIStr, String authToken) {
     URI tableConfigURI;
     try {
       tableConfigURI = new URI(tableConfigURIStr);
@@ -106,7 +117,7 @@ public class SegmentGenerationUtils {
       }
     } else {
       try {
-        tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8);
+        tableConfigJson = fetchUrl(tableConfigURI.toURL(), authToken);
       } catch (IOException e) {
         throw new RuntimeException(
             "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e);
@@ -142,7 +153,8 @@ public class SegmentGenerationUtils {
   public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
     URI relativePath = baseInputDir.relativize(inputFile);
     Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
-        "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: " + baseInputDir);
+        "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: "
+            + baseInputDir);
     String outputDirStr = outputDir.toString();
     outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
     URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
@@ -176,10 +188,11 @@ public class SegmentGenerationUtils {
       throws URISyntaxException {
     URI fileURI = URI.create(uriStr);
     if (fileURI.getScheme() == null) {
-      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(), fullUriForPathOnlyUriStr.getHost(),
-          fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment());
+      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(),
+          fullUriForPathOnlyUriStr.getHost(), fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(),
+          fileURI.getFragment());
     }
-    
+
     return fileURI;
   }
 
@@ -198,4 +211,22 @@ public class SegmentGenerationUtils {
     }
     return uri;
   }
+
+  /**
+   * Retrieve a URL via GET request, with an optional authorization token.
+   *
+   * @param url target url
+   * @param authToken optional auth token, or null
+   * @return fetched document
+   * @throws IOException on connection problems
+   */
+  private static String fetchUrl(URL url, String authToken)
+      throws IOException {
+    URLConnection connection = url.openConnection();
+
+    if (StringUtils.isNotBlank(authToken)) {
+      connection.setRequestProperty("Authorization", authToken);
+    }
+    return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8);
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4b12e2a..401b036 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -58,6 +58,7 @@ import org.apache.http.entity.mime.content.FileBody;
 import org.apache.http.entity.mime.content.InputStreamBody;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
@@ -340,7 +341,14 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) {
+    return getDownloadFileRequest(uri, socketTimeoutMs, null);
+  }
+
+  private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+    if (StringUtils.isNotBlank(authToken)) {
+      requestBuilder.addHeader("Authorization", authToken);
+    }
     setTimeout(requestBuilder, socketTimeoutMs);
     String userInfo = uri.getUserInfo();
     if (userInfo != null) {
@@ -687,7 +695,23 @@ public class FileUploadDownloadClient implements Closeable {
    */
   public int downloadFile(URI uri, int socketTimeoutMs, File dest)
       throws IOException, HttpErrorStatusException {
-    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs);
+    return downloadFile(uri, socketTimeoutMs, dest, null);
+  }
+
+  /**
+   * Download a file using default settings, with an optional auth token
+   *
+   * @param uri URI
+   * @param socketTimeoutMs Socket timeout in milliseconds
+   * @param dest File destination
+   * @param authToken optional auth token, or null
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken)
+      throws IOException, HttpErrorStatusException {
+    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken);
     try (CloseableHttpResponse response = _httpClient.execute(request)) {
       StatusLine statusLine = response.getStatusLine();
       int statusCode = statusLine.getStatusCode();
@@ -728,6 +752,21 @@ public class FileUploadDownloadClient implements Closeable {
     return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest);
   }
 
+  /**
+   * Download a file, with an optional auth token.
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authToken optional auth token, or null
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public int downloadFile(URI uri, File dest, String authToken)
+      throws IOException, HttpErrorStatusException {
+    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken);
+  }
+
   @Override
   public void close()
       throws IOException {
@@ -742,4 +781,17 @@ public class FileUploadDownloadClient implements Closeable {
   public static void installDefaultSSLContext(SSLContext sslContext) {
     _defaultSSLContext = sslContext;
   }
+
+  /**
+   * Generate an (optional) HTTP Authorization header given an auth token
+   *
+   * @param authToken auth token
+   * @return list of 0 or 1 "Authorization" headers
+   */
+  public static List<Header> makeAuthHeader(String authToken) {
+    if (org.apache.commons.lang3.StringUtils.isBlank(authToken)) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(new BasicHeader("Authorization", authToken));
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index e4a00c9..1988edb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.Random;
-
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
@@ -36,6 +35,7 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
   public static final String RETRY_COUNT_CONFIG_KEY = "retry.count";
   public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms";
   public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor";
+  public static final String AUTH_TOKEN = "auth.token";
   public static final int DEFAULT_RETRY_COUNT = 3;
   public static final int DEFAULT_RETRY_WAIT_MS = 100;
   public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
@@ -45,12 +45,14 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
   protected int _retryCount;
   protected int _retryWaitMs;
   protected int _retryDelayScaleFactor;
+  protected String _authToken;
 
   @Override
   public void init(PinotConfiguration config) {
     _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
     _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
     _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
+    _authToken = config.getProperty(AUTH_TOKEN);
     doInit(config);
     _logger
         .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index fcaee6c..6d6d25e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -41,7 +41,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       throws Exception {
     RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
       try {
-        int statusCode = _httpClient.downloadFile(uri, dest);
+        int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
         _logger
             .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
                 statusCode);
@@ -70,7 +70,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
   public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
       throws Exception {
     try {
-      int statusCode = _httpClient.downloadFile(uri, dest);
+      int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
       _logger
           .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
               statusCode);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 05b14d8..83d98e4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -34,22 +34,25 @@ import org.slf4j.LoggerFactory;
 
 
 public class SegmentFetcherFactory {
-  private SegmentFetcherFactory() {
-  }
+  private final static SegmentFetcherFactory instance = new SegmentFetcherFactory();
 
   static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
   private static final String PROTOCOLS_KEY = "protocols";
+  private static final String AUTH_TOKEN_KEY = "auth.token";
   private static final String ENCODED_SUFFIX = ".enc";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class);
-  private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new HashMap<>();
-  private static final SegmentFetcher DEFAULT_HTTP_SEGMENT_FETCHER = new HttpSegmentFetcher();
-  private static final SegmentFetcher DEFAULT_PINOT_FS_SEGMENT_FETCHER = new PinotFSSegmentFetcher();
-
-  static {
-    PinotConfiguration emptyConfig = new PinotConfiguration();
-    DEFAULT_HTTP_SEGMENT_FETCHER.init(emptyConfig);
-    DEFAULT_PINOT_FS_SEGMENT_FETCHER.init(emptyConfig);
+
+  private final Map<String, SegmentFetcher> _segmentFetcherMap = new HashMap<>();
+  private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher();
+  private final SegmentFetcher _pinotFSSegmentFetcher = new PinotFSSegmentFetcher();
+
+  private SegmentFetcherFactory() {
+    // left blank
+  }
+
+  public static SegmentFetcherFactory getInstance() {
+    return instance;
   }
 
   /**
@@ -57,6 +60,14 @@ public class SegmentFetcherFactory {
    */
   public static void init(PinotConfiguration config)
       throws Exception {
+    getInstance().initInternal(config);
+  }
+
+  private void initInternal(PinotConfiguration config)
+      throws Exception {
+    _httpSegmentFetcher.init(config); // directly, without sub-namespace
+    _pinotFSSegmentFetcher.init(config); // directly, without sub-namespace
+
     List<String> protocols = config.getProperty(PROTOCOLS_KEY, Arrays.asList());
     for (String protocol : protocols) {
       String segmentFetcherClassName = config.getProperty(protocol + SEGMENT_FETCHER_CLASS_KEY_SUFFIX);
@@ -77,8 +88,16 @@ public class SegmentFetcherFactory {
         LOGGER.info("Creating segment fetcher for protocol: {} with class: {}", protocol, segmentFetcherClassName);
         segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).newInstance();
       }
-      segmentFetcher.init(config.subset(protocol));
-      SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher);
+
+      String authToken = config.getProperty(AUTH_TOKEN_KEY);
+      Map<String, Object> subConfigMap = config.subset(protocol).toMap();
+      if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) && StringUtils.isNotBlank(authToken)) {
+        subConfigMap.put(AUTH_TOKEN_KEY, authToken);
+      }
+
+      segmentFetcher.init(new PinotConfiguration(subConfigMap));
+
+      _segmentFetcherMap.put(protocol, segmentFetcher);
     }
   }
 
@@ -87,7 +106,11 @@ public class SegmentFetcherFactory {
    * ({@link HttpSegmentFetcher} for "http" and "https", {@link PinotFSSegmentFetcher} for other protocols).
    */
   public static SegmentFetcher getSegmentFetcher(String protocol) {
-    SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol);
+    return getInstance().getSegmentFetcherInternal(protocol);
+  }
+
+  private SegmentFetcher getSegmentFetcherInternal(String protocol) {
+    SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol);
     if (segmentFetcher != null) {
       return segmentFetcher;
     } else {
@@ -95,9 +118,9 @@ public class SegmentFetcherFactory {
       switch (protocol) {
         case CommonConstants.HTTP_PROTOCOL:
         case CommonConstants.HTTPS_PROTOCOL:
-          return DEFAULT_HTTP_SEGMENT_FETCHER;
+          return _httpSegmentFetcher;
         default:
-          return DEFAULT_PINOT_FS_SEGMENT_FETCHER;
+          return _pinotFSSegmentFetcher;
       }
     }
   }
@@ -107,7 +130,7 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(URI uri, File dest)
       throws Exception {
-    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
+    getInstance().fetchSegmentToLocalInternal(uri, dest);
   }
 
   /**
@@ -115,7 +138,12 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(String uri, File dest)
       throws Exception {
-    fetchSegmentToLocal(new URI(uri), dest);
+    getInstance().fetchSegmentToLocalInternal(new URI(uri), dest);
+  }
+
+  private void fetchSegmentToLocalInternal(URI uri, File dest)
+      throws Exception {
+    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
   }
 
   /**
@@ -125,6 +153,11 @@ public class SegmentFetcherFactory {
    */
   public static void fetchAndDecryptSegmentToLocal(String uri, File dest, String crypterName)
       throws Exception {
+    getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, crypterName);
+  }
+
+  private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String crypterName)
+      throws Exception {
     if (crypterName == null) {
       fetchSegmentToLocal(uri, dest);
     } else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
index 0b2c00a..d8cf80b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.controller.api.access;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 /**
@@ -41,43 +44,13 @@ import java.util.stream.Collectors;
  */
 public class BasicAuthAccessControlFactory implements AccessControlFactory {
   private static final String PREFIX = "controller.admin.access.control.principals";
-  private static final String PASSWORD = "password";
-  private static final String PERMISSIONS = "permissions";
-  private static final String TABLES = "tables";
-  private static final String ALL = "*";
 
   private static final String HEADER_AUTHORIZATION = "Authorization";
 
   private AccessControl _accessControl;
 
   public void init(PinotConfiguration configuration) {
-    String principalNames = configuration.getProperty(PREFIX);
-    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
-    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
-      String name = rawName.trim();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
-      String password = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PASSWORD));
-      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
-      Set<String> tables = new HashSet<>();
-      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, TABLES));
-      if (StringUtils.isNotBlank(tableNames) && !ALL.equals(tableNames)) {
-        tables = Arrays.stream(tableNames.split(",")).map(String::trim).collect(Collectors.toSet());
-      }
-
-      Set<AccessType> permissions = new HashSet<>();
-      String permissionNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PERMISSIONS));
-      if (StringUtils.isNotBlank(permissionNames) && !ALL.equals(tableNames)) {
-        permissions = Arrays.stream(permissionNames.split(",")).map(String::trim).map(String::toUpperCase)
-                .map(AccessType::valueOf).collect(Collectors.toSet());
-      }
-
-      return new BasicAuthPrincipal(name, toToken(name, password), tables, permissions);
-    }).collect(Collectors.toList());
-
-    _accessControl = new BasicAuthAccessControl(principals);
+    _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
   }
 
   @Override
@@ -97,67 +70,40 @@ public class BasicAuthAccessControlFactory implements AccessControlFactory {
 
     @Override
     public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) {
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response = getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
       return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
     }
 
     @Override
     public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
-      return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(accessType)).isPresent();
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response =
+          getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType)))
+              .isPresent();
+      return getPrincipal(httpHeaders)
+          .filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType))).isPresent();
     }
 
     @Override
     public boolean hasAccess(AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response = getPrincipal(httpHeaders).isPresent();
       return getPrincipal(httpHeaders).isPresent();
     }
 
     private Optional<BasicAuthPrincipal> getPrincipal(HttpHeaders headers) {
-      return headers.getRequestHeader(HEADER_AUTHORIZATION).stream().map(BasicAuthAccessControlFactory::normalizeToken)
-              .map(_principals::get).filter(Objects::nonNull).findFirst();
-    }
-  }
-
-  /**
-   * Container object for basic auth principal
-   */
-  private static class BasicAuthPrincipal {
-    private final String _name;
-    private final String _token;
-    private final Set<String> _tables;
-    private final Set<AccessType> _permissions;
-
-    public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<AccessType> permissions) {
-      this._name = name;
-      this._token = token;
-      this._tables = tables;
-      this._permissions = permissions;
-    }
-
-    public String getName() {
-      return _name;
-    }
-
-    public String getToken() {
-      return _token;
-    }
-
-    public boolean hasTable(String tableName) {
-      return _tables.isEmpty() || _tables.contains(tableName);
-    }
-
-    public boolean hasPermission(AccessType accessType) {
-      return _permissions.isEmpty() || _permissions.contains(accessType);
-    }
-  }
+      if (headers == null) {
+        return Optional.empty();
+      }
 
-  private static String toToken(String name, String password) {
-    String identifier = String.format("%s:%s", name, password);
-    return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
-  }
+      List<String> authHeaders = headers.getRequestHeader(HEADER_AUTHORIZATION);
+      if (authHeaders == null) {
+        return Optional.empty();
+      }
 
-  private static String normalizeToken(String token) {
-    if (token == null) {
-      return null;
+      return authHeaders.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get)
+          .filter(Objects::nonNull).findFirst();
     }
-    return token.trim().replace("=", "");
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
new file mode 100644
index 0000000..ae44401
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
@@ -0,0 +1,37 @@
+package org.apache.pinot.core.auth;
+
+import java.util.Set;
+
+
+/**
+ * Container object for basic auth principal
+ */
+public class BasicAuthPrincipal {
+  private final String _name;
+  private final String _token;
+  private final Set<String> _tables;
+  private final Set<String> _permissions;
+
+  public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<String> permissions) {
+    this._name = name;
+    this._token = token;
+    this._tables = tables;
+    this._permissions = permissions;
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public String getToken() {
+    return _token;
+  }
+
+  public boolean hasTable(String tableName) {
+    return _tables.isEmpty() || _tables.contains(tableName);
+  }
+
+  public boolean hasPermission(String permission) {
+    return _permissions.isEmpty() || _permissions.contains(permission);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
new file mode 100644
index 0000000..46cfa6b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
@@ -0,0 +1,100 @@
+package org.apache.pinot.core.auth;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * Utility for configuring basic auth and parsing related http tokens
+ */
+public final class BasicAuthUtils {
+  private static final String PASSWORD = "password";
+  private static final String PERMISSIONS = "permissions";
+  private static final String TABLES = "tables";
+  private static final String ALL = "*";
+
+  private BasicAuthUtils() {
+    // left blank
+  }
+
+  /**
+   * Parse a pinot configuration namespace for access control settings, e.g. "controller.admin.access.control.principals".
+   *
+   * <pre>
+   *     Example:
+   *     my.prefix.access.control.principals=admin123,user456
+   *     my.prefix.access.control.principals.admin123.password=verysecret
+   *     my.prefix.access.control.principals.user456.password=kindasecret
+   *     my.prefix.access.control.principals.user456.tables=stuff,lessImportantStuff
+   *     my.prefix.access.control.principals.user456.permissions=read,update
+   * </pre>
+   *
+   * @param configuration pinot configuration
+   * @param prefix configuration namespace
+   * @return list of BasicAuthPrincipals
+   */
+  public static List<BasicAuthPrincipal> extractBasicAuthPrincipals(PinotConfiguration configuration, String prefix) {
+    String principalNames = configuration.getProperty(prefix);
+    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+    return Arrays.stream(principalNames.split(",")).map(rawName -> {
+      String name = rawName.trim();
+      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+      String password = configuration.getProperty(String.format("%s.%s.%s", prefix, name, PASSWORD));
+      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+      Set<String> tables = extractSet(configuration, String.format("%s.%s.%s", prefix, name, TABLES));
+      Set<String> permissions = extractSet(configuration, String.format("%s.%s.%s", prefix, name, PERMISSIONS));
+
+      return new BasicAuthPrincipal(name, toBasicAuthToken(name, password), tables, permissions);
+    }).collect(Collectors.toList());
+  }
+
+  private static Set<String> extractSet(PinotConfiguration configuration, String key) {
+    String input = configuration.getProperty(key);
+    if (StringUtils.isNotBlank(input) && !ALL.equals(input)) {
+      return Arrays.stream(input.split(",")).map(String::trim).collect(Collectors.toSet());
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * Convert a pair of name and password into a http header-compliant base64 encoded token
+   *
+   * @param name user name
+   * @param password password
+   * @return base64 encoded basic auth token
+   */
+  @Nullable
+  public static String toBasicAuthToken(String name, String password) {
+    if (StringUtils.isBlank(name)) {
+      return null;
+    }
+    String identifier = String.format("%s:%s", name, password);
+    return normalizeBase64Token(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+  }
+
+  /**
+   * Normalize a base64 encoded auth token by stripping redundant padding (spaces, '=')
+   *
+   * @param token base64 encoded auth token
+   * @return normalized auth token
+   */
+  @Nullable
+  public static String normalizeBase64Token(String token) {
+    if (token == null) {
+      return null;
+    }
+    return StringUtils.remove(token.trim(), '=');
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
index e490de2..ff4f66f 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -284,13 +284,15 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
     taskSpec.setRecordReaderSpec(recordReaderSpec);
 
+    String authToken = taskConfigs.get(BatchConfigProperties.AUTH_TOKEN); // TODO
+
     String tableNameWithType = taskConfigs.get(BatchConfigProperties.TABLE_NAME);
     Schema schema;
     if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
       schema = JsonUtils
           .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
     } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
-      schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+      schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI), authToken);
     } else {
       schema = getSchema(tableNameWithType);
     }
@@ -299,7 +301,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) {
       tableConfig = JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class);
     } else if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) {
-      tableConfig = SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI));
+      tableConfig =
+          SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI), authToken);
     } else {
       tableConfig = getTableConfig(tableNameWithType);
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 4c3b41b..d15cfcd 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -119,7 +120,9 @@ public class SegmentPushUtils implements Serializable {
           try (InputStream inputStream = fileSystem.open(tarFileURI)) {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                 .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream,
-                    tableName);
+                    FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList(
+                        new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)),
+                    FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                 controllerURI, response.getStatusCode(), response.getResponse());
             return true;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index 8b71584..c9622ba 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -67,7 +67,8 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
   private File _localTempDir;
 
   @Override
-  public void setup(Context context) throws IOException {
+  public void setup(Context context)
+      throws IOException {
     _jobConf = context.getConfiguration();
     Yaml yaml = new Yaml();
     String segmentGenerationJobSpecStr = _jobConf.get(SEGMENT_GENERATION_JOB_SPEC);
@@ -96,7 +97,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
     } else {
       LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", localPluginsTarFile.getAbsolutePath());
     }
-    
+
     // Register file systems
     List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
     for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
@@ -146,8 +147,9 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
       taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
       taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
       taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-      taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-      taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+      taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+      taskSpec.setTableConfig(
+          SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
       taskSpec.setSequenceId(idx);
       taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
       taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
@@ -186,7 +188,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
           .resolve(segmentTarFileName);
       LOGGER.info("Copying segment tar file from [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
       outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-      
+
       FileUtils.deleteQuietly(localSegmentDir);
       FileUtils.deleteQuietly(localSegmentTarFile);
       FileUtils.deleteQuietly(localInputDataFile);
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..d2273e7 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,13 +18,6 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -40,11 +33,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -65,6 +57,13 @@ import org.apache.spark.api.java.function.VoidFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
+import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
+
 
 public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {
 
@@ -302,8 +301,10 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
           taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
           taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
           taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-          taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-          taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+          taskSpec
+              .setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+          taskSpec.setTableConfig(
+              SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
           taskSpec.setSequenceId(idx);
           taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
           taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..52cf3de 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -27,14 +27,13 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -165,8 +164,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       FileUtils.forceMkdir(localOutputTempDir);
 
       //Read TableConfig, Schema
-      Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
-      TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
+      Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken());
+      TableConfig tableConfig =
+          SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken());
 
       int numInputFiles = filteredFiles.size();
       CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index deb3a6a..162ba71 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -70,6 +70,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   private HelixManager _helixManager;
   private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private String _authToken;
 
   @Override
   public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5a28884..0e3f1c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -49,6 +49,7 @@ public class BatchConfigProperties {
   public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
   public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
   public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+  public static final String AUTH_TOKEN = "authToken";
 
   public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 2e86199..35efe27 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -118,7 +118,7 @@ public class BootstrapTableTool {
     return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
         .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol)
         .setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true)
-        .execute();
+        .setAuthToken(_token).execute();
   }
 
   private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
@@ -178,6 +178,8 @@ public class BootstrapTableTool {
                 tlsSpec.getTrustStorePath(), tlsSpec.getTrustStorePassword());
           }
 
+          spec.setAuthToken(_token);
+
           IngestionJobLauncher.runIngestionJob(spec);
         }
       } else {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
index 30d4c28..1debd42 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
@@ -31,10 +31,13 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
 
+import javax.annotation.Nullable;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.tools.AbstractBaseCommand;
 import org.apache.pinot.tools.utils.PinotConfigUtils;
 
@@ -83,10 +86,9 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
     final URL url = new URL(urlString);
     final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
 
-    conn.setDoOutput(true);
+    headers.forEach(header -> conn.setRequestProperty(header.getName(), header.getValue()));
     conn.setRequestMethod(requestMethod);
-    headers.stream().flatMap(header -> Arrays.stream(header.getElements()))
-            .forEach(elem -> conn.setRequestProperty(elem.getName(), elem.getValue()));
+    conn.setDoOutput(true);
     if (payload != null) {
       final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),
           StandardCharsets.UTF_8));
@@ -123,16 +125,35 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
     return PinotConfigUtils.readConfigFromFile(configFileName);
   }
 
-  static List<Header> makeBasicAuth(String user, String password) {
-    if (StringUtils.isBlank(user)) {
-      return Collections.emptyList();
+  /**
+   * Generate an (optional) HTTP Authorization header given an auth token
+   * @see FileUploadDownloadClient#makeAuthHeader(String)
+   *
+   * @param authToken auth token
+   * @return list of 0 or 1 "Authorization" headers
+   */
+  static List<Header> makeAuthHeader(String authToken) {
+    return FileUploadDownloadClient.makeAuthHeader(authToken);
+  }
+
+  /**
+   * Generate auth token from pass-thru token or generate basic auth from user/password pair
+   *
+   * @param authToken optional pass-thru token
+   * @param user optional username
+   * @param password optional password
+   * @return auth token, or null if neither pass-thru token nor user info available
+   */
+  @Nullable
+  static String makeAuthToken(String authToken, String user, String password) {
+    if (StringUtils.isNotBlank(authToken)) {
+      return authToken;
     }
 
-    if (StringUtils.isBlank(password)) {
-      password = "";
+    if (StringUtils.isNotBlank(user)) {
+      return BasicAuthUtils.toBasicAuthToken(user, password);
     }
 
-    String token = "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
-    return Collections.singletonList(new BasicHeader("Authorization", token));
+    return null;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index 2bb0a99..fdf7a06 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -60,6 +60,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -80,10 +83,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
 
   @Override
   public String toString() {
-    String retString =
-        ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
-            + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
-                + "[hidden]");
+    String retString = ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+        + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
+        + "[hidden]");
 
     return ((_exec) ? (retString + " -exec") : retString);
   }
@@ -114,11 +116,15 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   }
 
   public void setUser(String user) {
-    this._user = user;
+    _user = user;
   }
 
   public void setPassword(String password) {
-    this._password = password;
+    _password = password;
+  }
+
+  public void setAuthToken(String authToken) {
+    _authToken = authToken;
   }
 
   public AddSchemaCommand setExecute(boolean exec) {
@@ -147,9 +153,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
 
     Schema schema = Schema.fromFile(schemaFile);
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      fileUploadDownloadClient.addSchema(
-          FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+      fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+              .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+          schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+          Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       return false;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index 78123e5..e8403fa 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.NetUtil;
@@ -66,6 +65,9 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -91,7 +93,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     String retString =
         ("AddTable -tableConfigFile " + _tableConfigFile + " -schemaFile " + _schemaFile + " -controllerProtocol "
             + _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort
-                + " -user " + _user + " -password " + "[hidden]");
+            + " -user " + _user + " -password " + "[hidden]");
     return ((_exec) ? (retString + " -exec") : retString);
   }
 
@@ -134,6 +136,11 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     return this;
   }
 
+  public AddTableCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public AddTableCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -151,9 +158,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
       throw e;
     }
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      fileUploadDownloadClient.addSchema(
-          FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+      fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+              .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+          schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+          Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       throw e;
@@ -163,7 +171,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   public boolean sendTableCreationRequest(JsonNode node)
       throws IOException {
     String res = AbstractBaseAdminCommand
-        .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString());
+        .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString(),
+            makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
     LOGGER.info(res);
     return res.contains("succesfully added");
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
index 3365314..efd4bf9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
@@ -67,6 +67,9 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -112,6 +115,11 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public AddTenantCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public AddTenantCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -137,7 +145,7 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount);
     String res = AbstractBaseAdminCommand
         .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
-            tenant.toJsonString(), makeBasicAuth(_user, _password));
+            tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
 
     LOGGER.info(res);
     System.out.print(res);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
index 6f7d017..12806c5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.BootstrapTableTool;
@@ -82,6 +84,9 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -122,7 +127,7 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
     if (_controllerHost == null) {
       _controllerHost = NetUtil.getHostAddress();
     }
-    String token = ""; // TODO
+    String token = makeAuthToken(_authToken, _user, _password);
     return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token).execute();
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
index d240e77..83ad5f8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
@@ -23,6 +23,7 @@ import java.net.URL;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpURL;
 import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
@@ -56,6 +57,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -75,8 +79,12 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
     URI uri = new URI(_controllerProtocol, null, _controllerHost, Integer.parseInt(_controllerPort),
         URI_TABLES_PATH + _tableName, "state=" + stateValue, null);
 
+    String token = makeAuthToken(_authToken, _user, _password);
+
     GetMethod httpGet = new GetMethod(uri.toString());
-    httpGet.setRequestHeader("Authorization", null); // TODO
+    if (StringUtils.isNotBlank(token)) {
+      httpGet.setRequestHeader("Authorization", token);
+    }
     int status = httpClient.executeMethod(httpGet);
     if (status != 200) {
       throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
index 49884e4..c432146 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -81,6 +81,9 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
   private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
 
@@ -243,11 +246,7 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
     spec.setCleanUpOutputDir(true);
     spec.setOverwriteOutput(true);
     spec.setJobType("SegmentCreationAndTarPush");
-
-    if (!StringUtils.isBlank(_user)) {
-      String token = ""; // TODO
-      spec.setAuthToken(token);
-    }
+    spec.setAuthToken(makeAuthToken(_authToken, _user, _password));
 
     // set ExecutionFrameworkSpec
     ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
index 97b04c6..27d3520 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
@@ -19,12 +19,11 @@
 package org.apache.pinot.tools.admin.command;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Iterator;
-import org.apache.commons.io.IOUtils;
+import java.util.List;
 import org.apache.commons.lang.StringUtils;
+import org.apache.http.Header;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -52,6 +51,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-config", metaVar = "<string>", usage = "Cluster config to operate.")
   private String _config;
 
@@ -73,8 +75,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
 
   @Override
   public String toString() {
-    String toString = "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost "
-        + _controllerHost + " -controllerPort " + _controllerPort + " -operation " + _operation;
+    String toString =
+        "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+            + " -controllerPort " + _controllerPort + " -operation " + _operation;
     if (_config != null) {
       toString += " -config " + _config;
     }
@@ -116,6 +119,11 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
     return this;
   }
 
+  public OperateClusterConfigCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public OperateClusterConfigCommand setConfig(String config) {
     _config = config;
     return this;
@@ -135,7 +143,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
     if (StringUtils.isEmpty(_config) && !_operation.equalsIgnoreCase("GET")) {
       throw new UnsupportedOperationException("Empty config: " + _config);
     }
-    String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+    String clusterConfigUrl =
+        _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+    List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user, _password));
     switch (_operation.toUpperCase()) {
       case "ADD":
       case "UPDATE":
@@ -145,10 +155,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
               "Bad config: " + _config + ". Please follow the pattern of [Config Key]=[Config Value]");
         }
         String request = JsonUtils.objectToString(Collections.singletonMap(splits[0], splits[1]));
-        return sendRequest("POST", clusterConfigUrl, request, makeBasicAuth(_user, _password));
+        return sendRequest("POST", clusterConfigUrl, request, headers);
       case "GET":
-        // TODO
-        String response = IOUtils.toString(new URI(clusterConfigUrl), StandardCharsets.UTF_8);
+        String response = sendRequest("GET", clusterConfigUrl, null, headers);
         JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
         Iterator<String> fieldNamesIterator = jsonNode.fieldNames();
         String results = "";
@@ -159,8 +168,7 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
         }
         return results;
       case "DELETE":
-        return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null,
-            makeBasicAuth(_user, _password));
+        return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null, headers);
       default:
         throw new UnsupportedOperationException("Unsupported operation: " + _operation);
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index 4afb686..72c16e4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -21,8 +21,8 @@ package org.apache.pinot.tools.admin.command;
 import java.util.Collections;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -53,6 +53,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -68,8 +71,8 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
 
   @Override
   public String toString() {
-    return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort " +
-        _brokerPort + " -queryType " + _queryType + " -query " + _query);
+    return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort "
+        + _brokerPort + " -queryType " + _queryType + " -query " + _query);
   }
 
   @Override
@@ -107,6 +110,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public PostQueryCommand setauthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public PostQueryCommand setQueryType(String queryType) {
     _queryType = queryType;
     return this;
@@ -133,7 +141,7 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
       request = JsonUtils.objectToString(Collections.singletonMap(Request.PQL, _query));
     }
 
-    return sendRequest("POST", urlString, request, makeBasicAuth(_user, _password));
+    return sendRequest("POST", urlString, request, makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
   }
 
   @Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 3630602..db53c31 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -202,9 +203,10 @@ public class QuickstartRunner {
 
   public void bootstrapTable()
       throws Exception {
+    String token = BasicAuthUtils.toBasicAuthToken("admin", "verysecret");
     for (QuickstartTableRequest request : _tableRequests) {
       if (!new BootstrapTableTool("http", InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0),
-          request.getBootstrapTableDir(), null).execute()) {
+          request.getBootstrapTableDir(), token).execute()) {
         throw new RuntimeException("Failed to bootstrap table with request - " + request);
       }
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index d393e57..679a2a6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -58,6 +58,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.")
   private String _segmentDir = null;
 
@@ -119,6 +122,11 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
     return this;
   }
 
+  public UploadSegmentCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public UploadSegmentCommand setSegmentDir(String segmentDir) {
     _segmentDir = segmentDir;
     return this;
@@ -159,7 +167,7 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
         LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
         fileUploadDownloadClient
             .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile,
-                makeBasicAuth(_user, _password), Collections.singletonList(new BasicNameValuePair(
+                makeAuthHeader(makeAuthToken(_authToken, _user, _password)), Collections.singletonList(new BasicNameValuePair(
                     FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)),
                 FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
       }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 0354fd9..7a17845 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -76,6 +76,7 @@ public class PinotConfigUtils {
     properties.put("controller.admin.access.control.principals.user.password", "secret");
     properties.put("controller.admin.access.control.principals.user.tables", "baseballStats");
     properties.put("controller.admin.access.control.principals.user.permissions", "read");
+    properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -176,6 +177,7 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir);
+    properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -188,6 +190,7 @@ public class PinotConfigUtils {
     Map<String, Object> properties = new HashMap<>();
     properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost);
     properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort());
+    properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org