You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2021/02/25 19:47:17 UTC

[incubator-pinot] branch basic-auth-controller created (now 58d047a)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a change to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 58d047a  refactor FileUploadDownloadClient

This branch includes the following new commits:

     new 29c3cc0  progress
     new 0f47b60  tokens everywhere
     new 67e04c8  transitioning more calls for auth support
     new 58d047a  refactor FileUploadDownloadClient

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/04: tokens everywhere

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0f47b6097ba4ece74038eaa6436beda157fbb524
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Wed Feb 10 18:45:36 2021 -0800

    tokens everywhere
---
 .../broker/BasicAuthAccessControlFactory.java      |  95 +++--------------
 .../segment/generation/SegmentGenerationUtils.java |  43 ++++++--
 .../common/utils/FileUploadDownloadClient.java     |  54 +++++++++-
 .../common/utils/fetcher/BaseSegmentFetcher.java   |   4 +-
 .../common/utils/fetcher/HttpSegmentFetcher.java   |   4 +-
 .../utils/fetcher/SegmentFetcherFactory.java       |  69 +++++++++----
 .../api/access/BasicAuthAccessControlFactory.java  | 112 ++++++---------------
 .../apache/pinot/core/auth/BasicAuthPrincipal.java |  37 +++++++
 .../org/apache/pinot/core/auth/BasicAuthUtils.java | 100 ++++++++++++++++++
 .../SegmentGenerationAndPushTaskExecutor.java      |   7 +-
 .../ingestion/batch/common/SegmentPushUtils.java   |   5 +-
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |  14 +--
 .../spark/SparkSegmentGenerationJobRunner.java     |  23 +++--
 .../standalone/SegmentGenerationJobRunner.java     |   8 +-
 .../starter/helix/HelixInstanceDataManager.java    |   1 +
 .../spi/ingestion/batch/BatchConfigProperties.java |   1 +
 .../org/apache/pinot/tools/BootstrapTableTool.java |   4 +-
 .../admin/command/AbstractBaseAdminCommand.java    |  41 ++++++--
 .../tools/admin/command/AddSchemaCommand.java      |  25 +++--
 .../pinot/tools/admin/command/AddTableCommand.java |  21 ++--
 .../tools/admin/command/AddTenantCommand.java      |  10 +-
 .../tools/admin/command/BootstrapTableCommand.java |   7 +-
 .../tools/admin/command/ChangeTableState.java      |  10 +-
 .../tools/admin/command/ImportDataCommand.java     |   9 +-
 .../admin/command/OperateClusterConfigCommand.java |  30 ++++--
 .../tools/admin/command/PostQueryCommand.java      |  16 ++-
 .../tools/admin/command/QuickstartRunner.java      |   4 +-
 .../tools/admin/command/UploadSegmentCommand.java  |  10 +-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |   3 +
 29 files changed, 499 insertions(+), 268 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
index 3a670b3..0d00642 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java
@@ -19,16 +19,18 @@
 package org.apache.pinot.broker.broker;
 
 import com.google.common.base.Preconditions;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.pinot.broker.api.AccessControl;
 import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
 
 
 /**
@@ -43,10 +45,7 @@ import java.util.stream.Collectors;
  * </pre>
  */
 public class BasicAuthAccessControlFactory extends AccessControlFactory {
-  private static final String PRINCIPALS = "principals";
-  private static final String PASSWORD = "password";
-  private static final String TABLES = "tables";
-  private static final String TABLES_ALL = "*";
+  private static final String PREFIX = "principals";
 
   private static final String HEADER_AUTHORIZATION = "authorization";
 
@@ -57,26 +56,7 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
   }
 
   public void init(PinotConfiguration configuration) {
-    String principalNames = configuration.getProperty(PRINCIPALS);
-    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
-    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
-      String name = rawName.trim();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
-      String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD));
-      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
-      Set<String> tables = new HashSet<>();
-      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES));
-      if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) {
-        tables.addAll(Arrays.asList(tableNames.split(",")));
-      }
-
-      return new BasicAuthPrincipal(name, toToken(name, password), tables);
-    }).collect(Collectors.toList());
-
-    _accessControl = new BasicAuthAccessControl(principals);
+    _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
   }
 
   public AccessControl create() {
@@ -100,8 +80,8 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
 
       Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
       Optional<BasicAuthPrincipal> principalOpt =
-          tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken).map(_principals::get)
-              .filter(Objects::nonNull).findFirst();
+          tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get).filter(Objects::nonNull)
+              .findFirst();
 
       if (!principalOpt.isPresent()) {
         // no matching token? reject
@@ -109,61 +89,12 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory {
       }
 
       BasicAuthPrincipal principal = principalOpt.get();
-      if (principal.getTables().isEmpty() || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource()
-          .isSetTableName()) {
+      if (!brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource().isSetTableName()) {
         // no table restrictions? accept
         return true;
       }
 
-      return principal.getTables().contains(brokerRequest.getQuerySource().getTableName());
-    }
-  }
-
-  /**
-   * Container object for basic auth principal
-   */
-  private static class BasicAuthPrincipal {
-    private final String _name;
-    private final String _token;
-    private final Set<String> _tables;
-
-    public BasicAuthPrincipal(String name, String token, Set<String> tables) {
-      _name = name;
-      _token = token;
-      _tables = tables;
-    }
-
-    public String getName() {
-      return _name;
-    }
-
-    public Set<String> getTables() {
-      return _tables;
-    }
-
-    public String getToken() {
-      return _token;
-    }
-  }
-
-  private static String toToken(String name, String password) {
-    String identifier = String.format("%s:%s", name, password);
-    return normalizeToken(
-        String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes(StandardCharsets.UTF_8))));
-  }
-
-  /**
-   * Implementations of base64 encoding vary and may generate different numbers of padding characters "=". We normalize
-   * these by removing any padding.
-   *
-   * @param token raw token
-   * @return normalized token
-   */
-  @Nullable
-  private static String normalizeToken(String token) {
-    if (token == null) {
-      return null;
+      return principal.hasTable(brokerRequest.getQuerySource().getTableName());
     }
-    return StringUtils.remove(token.trim(), '=');
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
index 3c0c06e..588d9fd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
@@ -26,8 +26,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.nio.charset.StandardCharsets;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -50,6 +53,10 @@ public class SegmentGenerationUtils {
   }
 
   public static Schema getSchema(String schemaURIString) {
+    return getSchema(schemaURIString, null);
+  }
+
+  public static Schema getSchema(String schemaURIString, String authToken) {
     URI schemaURI;
     try {
       schemaURI = new URI(schemaURIString);
@@ -75,7 +82,7 @@ public class SegmentGenerationUtils {
     } else {
       // Try to directly read from URI.
       try {
-        schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
+        schemaJson = fetchUrl(schemaURI.toURL(), authToken);
       } catch (IOException e) {
         throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e);
       }
@@ -88,6 +95,10 @@ public class SegmentGenerationUtils {
   }
 
   public static TableConfig getTableConfig(String tableConfigURIStr) {
+    return getTableConfig(tableConfigURIStr, null);
+  }
+
+  public static TableConfig getTableConfig(String tableConfigURIStr, String authToken) {
     URI tableConfigURI;
     try {
       tableConfigURI = new URI(tableConfigURIStr);
@@ -106,7 +117,7 @@ public class SegmentGenerationUtils {
       }
     } else {
       try {
-        tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8);
+        tableConfigJson = fetchUrl(tableConfigURI.toURL(), authToken);
       } catch (IOException e) {
         throw new RuntimeException(
             "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e);
@@ -142,7 +153,8 @@ public class SegmentGenerationUtils {
   public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
     URI relativePath = baseInputDir.relativize(inputFile);
     Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
-        "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: " + baseInputDir);
+        "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: "
+            + baseInputDir);
     String outputDirStr = outputDir.toString();
     outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
     URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
@@ -176,10 +188,11 @@ public class SegmentGenerationUtils {
       throws URISyntaxException {
     URI fileURI = URI.create(uriStr);
     if (fileURI.getScheme() == null) {
-      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(), fullUriForPathOnlyUriStr.getHost(),
-          fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment());
+      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(),
+          fullUriForPathOnlyUriStr.getHost(), fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(),
+          fileURI.getFragment());
     }
-    
+
     return fileURI;
   }
 
@@ -198,4 +211,22 @@ public class SegmentGenerationUtils {
     }
     return uri;
   }
+
+  /**
+   * Retrieve a URL via GET request, with an optional authorization token.
+   *
+   * @param url target url
+   * @param authToken optional auth token, or null
+   * @return fetched document
+   * @throws IOException on connection problems
+   */
+  private static String fetchUrl(URL url, String authToken)
+      throws IOException {
+    URLConnection connection = url.openConnection();
+
+    if (StringUtils.isNotBlank(authToken)) {
+      connection.setRequestProperty("Authorization", authToken);
+    }
+    return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8);
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4b12e2a..401b036 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -58,6 +58,7 @@ import org.apache.http.entity.mime.content.FileBody;
 import org.apache.http.entity.mime.content.InputStreamBody;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
@@ -340,7 +341,14 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) {
+    return getDownloadFileRequest(uri, socketTimeoutMs, null);
+  }
+
+  private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+    if (StringUtils.isNotBlank(authToken)) {
+      requestBuilder.addHeader("Authorization", authToken);
+    }
     setTimeout(requestBuilder, socketTimeoutMs);
     String userInfo = uri.getUserInfo();
     if (userInfo != null) {
@@ -687,7 +695,23 @@ public class FileUploadDownloadClient implements Closeable {
    */
   public int downloadFile(URI uri, int socketTimeoutMs, File dest)
       throws IOException, HttpErrorStatusException {
-    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs);
+    return downloadFile(uri, socketTimeoutMs, dest, null);
+  }
+
+  /**
+   * Download a file using default settings, with an optional auth token
+   *
+   * @param uri URI
+   * @param socketTimeoutMs Socket timeout in milliseconds
+   * @param dest File destination
+   * @param authToken optional auth token, or null
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken)
+      throws IOException, HttpErrorStatusException {
+    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken);
     try (CloseableHttpResponse response = _httpClient.execute(request)) {
       StatusLine statusLine = response.getStatusLine();
       int statusCode = statusLine.getStatusCode();
@@ -728,6 +752,21 @@ public class FileUploadDownloadClient implements Closeable {
     return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest);
   }
 
+  /**
+   * Download a file, with an optional auth token.
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authToken optional auth token, or null
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public int downloadFile(URI uri, File dest, String authToken)
+      throws IOException, HttpErrorStatusException {
+    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken);
+  }
+
   @Override
   public void close()
       throws IOException {
@@ -742,4 +781,17 @@ public class FileUploadDownloadClient implements Closeable {
   public static void installDefaultSSLContext(SSLContext sslContext) {
     _defaultSSLContext = sslContext;
   }
+
+  /**
+   * Generate an (optional) HTTP Authorization header given an auth token
+   *
+   * @param authToken auth token
+   * @return list of 0 or 1 "Authorization" headers
+   */
+  public static List<Header> makeAuthHeader(String authToken) {
+    if (org.apache.commons.lang3.StringUtils.isBlank(authToken)) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(new BasicHeader("Authorization", authToken));
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index e4a00c9..1988edb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.Random;
-
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
@@ -36,6 +35,7 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
   public static final String RETRY_COUNT_CONFIG_KEY = "retry.count";
   public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms";
   public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor";
+  public static final String AUTH_TOKEN = "auth.token";
   public static final int DEFAULT_RETRY_COUNT = 3;
   public static final int DEFAULT_RETRY_WAIT_MS = 100;
   public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
@@ -45,12 +45,14 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
   protected int _retryCount;
   protected int _retryWaitMs;
   protected int _retryDelayScaleFactor;
+  protected String _authToken;
 
   @Override
   public void init(PinotConfiguration config) {
     _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
     _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
     _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
+    _authToken = config.getProperty(AUTH_TOKEN);
     doInit(config);
     _logger
         .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index fcaee6c..6d6d25e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -41,7 +41,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       throws Exception {
     RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
       try {
-        int statusCode = _httpClient.downloadFile(uri, dest);
+        int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
         _logger
             .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
                 statusCode);
@@ -70,7 +70,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
   public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
       throws Exception {
     try {
-      int statusCode = _httpClient.downloadFile(uri, dest);
+      int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
       _logger
           .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
               statusCode);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 05b14d8..83d98e4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -34,22 +34,25 @@ import org.slf4j.LoggerFactory;
 
 
 public class SegmentFetcherFactory {
-  private SegmentFetcherFactory() {
-  }
+  private final static SegmentFetcherFactory instance = new SegmentFetcherFactory();
 
   static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
   private static final String PROTOCOLS_KEY = "protocols";
+  private static final String AUTH_TOKEN_KEY = "auth.token";
   private static final String ENCODED_SUFFIX = ".enc";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class);
-  private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new HashMap<>();
-  private static final SegmentFetcher DEFAULT_HTTP_SEGMENT_FETCHER = new HttpSegmentFetcher();
-  private static final SegmentFetcher DEFAULT_PINOT_FS_SEGMENT_FETCHER = new PinotFSSegmentFetcher();
-
-  static {
-    PinotConfiguration emptyConfig = new PinotConfiguration();
-    DEFAULT_HTTP_SEGMENT_FETCHER.init(emptyConfig);
-    DEFAULT_PINOT_FS_SEGMENT_FETCHER.init(emptyConfig);
+
+  private final Map<String, SegmentFetcher> _segmentFetcherMap = new HashMap<>();
+  private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher();
+  private final SegmentFetcher _pinotFSSegmentFetcher = new PinotFSSegmentFetcher();
+
+  private SegmentFetcherFactory() {
+    // left blank
+  }
+
+  public static SegmentFetcherFactory getInstance() {
+    return instance;
   }
 
   /**
@@ -57,6 +60,14 @@ public class SegmentFetcherFactory {
    */
   public static void init(PinotConfiguration config)
       throws Exception {
+    getInstance().initInternal(config);
+  }
+
+  private void initInternal(PinotConfiguration config)
+      throws Exception {
+    _httpSegmentFetcher.init(config); // directly, without sub-namespace
+    _pinotFSSegmentFetcher.init(config); // directly, without sub-namespace
+
     List<String> protocols = config.getProperty(PROTOCOLS_KEY, Arrays.asList());
     for (String protocol : protocols) {
       String segmentFetcherClassName = config.getProperty(protocol + SEGMENT_FETCHER_CLASS_KEY_SUFFIX);
@@ -77,8 +88,16 @@ public class SegmentFetcherFactory {
         LOGGER.info("Creating segment fetcher for protocol: {} with class: {}", protocol, segmentFetcherClassName);
         segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).newInstance();
       }
-      segmentFetcher.init(config.subset(protocol));
-      SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher);
+
+      String authToken = config.getProperty(AUTH_TOKEN_KEY);
+      Map<String, Object> subConfigMap = config.subset(protocol).toMap();
+      if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) && StringUtils.isNotBlank(authToken)) {
+        subConfigMap.put(AUTH_TOKEN_KEY, authToken);
+      }
+
+      segmentFetcher.init(new PinotConfiguration(subConfigMap));
+
+      _segmentFetcherMap.put(protocol, segmentFetcher);
     }
   }
 
@@ -87,7 +106,11 @@ public class SegmentFetcherFactory {
    * ({@link HttpSegmentFetcher} for "http" and "https", {@link PinotFSSegmentFetcher} for other protocols).
    */
   public static SegmentFetcher getSegmentFetcher(String protocol) {
-    SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol);
+    return getInstance().getSegmentFetcherInternal(protocol);
+  }
+
+  private SegmentFetcher getSegmentFetcherInternal(String protocol) {
+    SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol);
     if (segmentFetcher != null) {
       return segmentFetcher;
     } else {
@@ -95,9 +118,9 @@ public class SegmentFetcherFactory {
       switch (protocol) {
         case CommonConstants.HTTP_PROTOCOL:
         case CommonConstants.HTTPS_PROTOCOL:
-          return DEFAULT_HTTP_SEGMENT_FETCHER;
+          return _httpSegmentFetcher;
         default:
-          return DEFAULT_PINOT_FS_SEGMENT_FETCHER;
+          return _pinotFSSegmentFetcher;
       }
     }
   }
@@ -107,7 +130,7 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(URI uri, File dest)
       throws Exception {
-    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
+    getInstance().fetchSegmentToLocalInternal(uri, dest);
   }
 
   /**
@@ -115,7 +138,12 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(String uri, File dest)
       throws Exception {
-    fetchSegmentToLocal(new URI(uri), dest);
+    getInstance().fetchSegmentToLocalInternal(new URI(uri), dest);
+  }
+
+  private void fetchSegmentToLocalInternal(URI uri, File dest)
+      throws Exception {
+    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
   }
 
   /**
@@ -125,6 +153,11 @@ public class SegmentFetcherFactory {
    */
   public static void fetchAndDecryptSegmentToLocal(String uri, File dest, String crypterName)
       throws Exception {
+    getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, crypterName);
+  }
+
+  private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String crypterName)
+      throws Exception {
     if (crypterName == null) {
       fetchSegmentToLocal(uri, dest);
     } else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
index 0b2c00a..d8cf80b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.controller.api.access;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.pinot.core.auth.BasicAuthPrincipal;
+import org.apache.pinot.core.auth.BasicAuthUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 /**
@@ -41,43 +44,13 @@ import java.util.stream.Collectors;
  */
 public class BasicAuthAccessControlFactory implements AccessControlFactory {
   private static final String PREFIX = "controller.admin.access.control.principals";
-  private static final String PASSWORD = "password";
-  private static final String PERMISSIONS = "permissions";
-  private static final String TABLES = "tables";
-  private static final String ALL = "*";
 
   private static final String HEADER_AUTHORIZATION = "Authorization";
 
   private AccessControl _accessControl;
 
   public void init(PinotConfiguration configuration) {
-    String principalNames = configuration.getProperty(PREFIX);
-    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
-
-    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
-      String name = rawName.trim();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
-
-      String password = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PASSWORD));
-      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
-
-      Set<String> tables = new HashSet<>();
-      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, TABLES));
-      if (StringUtils.isNotBlank(tableNames) && !ALL.equals(tableNames)) {
-        tables = Arrays.stream(tableNames.split(",")).map(String::trim).collect(Collectors.toSet());
-      }
-
-      Set<AccessType> permissions = new HashSet<>();
-      String permissionNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PERMISSIONS));
-      if (StringUtils.isNotBlank(permissionNames) && !ALL.equals(tableNames)) {
-        permissions = Arrays.stream(permissionNames.split(",")).map(String::trim).map(String::toUpperCase)
-                .map(AccessType::valueOf).collect(Collectors.toSet());
-      }
-
-      return new BasicAuthPrincipal(name, toToken(name, password), tables, permissions);
-    }).collect(Collectors.toList());
-
-    _accessControl = new BasicAuthAccessControl(principals);
+    _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX));
   }
 
   @Override
@@ -97,67 +70,40 @@ public class BasicAuthAccessControlFactory implements AccessControlFactory {
 
     @Override
     public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) {
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response = getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
       return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
     }
 
     @Override
     public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
-      return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(accessType)).isPresent();
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response =
+          getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType)))
+              .isPresent();
+      return getPrincipal(httpHeaders)
+          .filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType))).isPresent();
     }
 
     @Override
     public boolean hasAccess(AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
+      Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders);
+      boolean response = getPrincipal(httpHeaders).isPresent();
       return getPrincipal(httpHeaders).isPresent();
     }
 
     private Optional<BasicAuthPrincipal> getPrincipal(HttpHeaders headers) {
-      return headers.getRequestHeader(HEADER_AUTHORIZATION).stream().map(BasicAuthAccessControlFactory::normalizeToken)
-              .map(_principals::get).filter(Objects::nonNull).findFirst();
-    }
-  }
-
-  /**
-   * Container object for basic auth principal
-   */
-  private static class BasicAuthPrincipal {
-    private final String _name;
-    private final String _token;
-    private final Set<String> _tables;
-    private final Set<AccessType> _permissions;
-
-    public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<AccessType> permissions) {
-      this._name = name;
-      this._token = token;
-      this._tables = tables;
-      this._permissions = permissions;
-    }
-
-    public String getName() {
-      return _name;
-    }
-
-    public String getToken() {
-      return _token;
-    }
-
-    public boolean hasTable(String tableName) {
-      return _tables.isEmpty() || _tables.contains(tableName);
-    }
-
-    public boolean hasPermission(AccessType accessType) {
-      return _permissions.isEmpty() || _permissions.contains(accessType);
-    }
-  }
+      if (headers == null) {
+        return Optional.empty();
+      }
 
-  private static String toToken(String name, String password) {
-    String identifier = String.format("%s:%s", name, password);
-    return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
-  }
+      List<String> authHeaders = headers.getRequestHeader(HEADER_AUTHORIZATION);
+      if (authHeaders == null) {
+        return Optional.empty();
+      }
 
-  private static String normalizeToken(String token) {
-    if (token == null) {
-      return null;
+      return authHeaders.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get)
+          .filter(Objects::nonNull).findFirst();
     }
-    return token.trim().replace("=", "");
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
new file mode 100644
index 0000000..ae44401
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
@@ -0,0 +1,37 @@
+package org.apache.pinot.core.auth;
+
+import java.util.Set;
+
+
+/**
+ * Container object for basic auth principal
+ */
+public class BasicAuthPrincipal {
+  private final String _name;
+  private final String _token;
+  private final Set<String> _tables;
+  private final Set<String> _permissions;
+
+  public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<String> permissions) {
+    this._name = name;
+    this._token = token;
+    this._tables = tables;
+    this._permissions = permissions;
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public String getToken() {
+    return _token;
+  }
+
+  public boolean hasTable(String tableName) {
+    return _tables.isEmpty() || _tables.contains(tableName);
+  }
+
+  public boolean hasPermission(String permission) {
+    return _permissions.isEmpty() || _permissions.contains(permission);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
new file mode 100644
index 0000000..46cfa6b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
@@ -0,0 +1,100 @@
+package org.apache.pinot.core.auth;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * Utility for configuring basic auth and parsing related http tokens
+ */
+public final class BasicAuthUtils {
+  private static final String PASSWORD = "password";
+  private static final String PERMISSIONS = "permissions";
+  private static final String TABLES = "tables";
+  private static final String ALL = "*";
+
+  private BasicAuthUtils() {
+    // left blank
+  }
+
+  /**
+   * Parse a pinot configuration namespace for access control settings, e.g. "controller.admin.access.control.principals".
+   *
+   * <pre>
+   *     Example:
+   *     my.prefix.access.control.principals=admin123,user456
+   *     my.prefix.access.control.principals.admin123.password=verysecret
+   *     my.prefix.access.control.principals.user456.password=kindasecret
+   *     my.prefix.access.control.principals.user456.tables=stuff,lessImportantStuff
+   *     my.prefix.access.control.principals.user456.permissions=read,update
+   * </pre>
+   *
+   * @param configuration pinot configuration
+   * @param prefix configuration namespace
+   * @return list of BasicAuthPrincipals
+   */
+  public static List<BasicAuthPrincipal> extractBasicAuthPrincipals(PinotConfiguration configuration, String prefix) {
+    String principalNames = configuration.getProperty(prefix);
+    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+    return Arrays.stream(principalNames.split(",")).map(rawName -> {
+      String name = rawName.trim();
+      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+      String password = configuration.getProperty(String.format("%s.%s.%s", prefix, name, PASSWORD));
+      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+      Set<String> tables = extractSet(configuration, String.format("%s.%s.%s", prefix, name, TABLES));
+      Set<String> permissions = extractSet(configuration, String.format("%s.%s.%s", prefix, name, PERMISSIONS));
+
+      return new BasicAuthPrincipal(name, toBasicAuthToken(name, password), tables, permissions);
+    }).collect(Collectors.toList());
+  }
+
+  private static Set<String> extractSet(PinotConfiguration configuration, String key) {
+    String input = configuration.getProperty(key);
+    if (StringUtils.isNotBlank(input) && !ALL.equals(input)) {
+      return Arrays.stream(input.split(",")).map(String::trim).collect(Collectors.toSet());
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * Convert a pair of name and password into a http header-compliant base64 encoded token
+   *
+   * @param name user name
+   * @param password password
+   * @return base64 encoded basic auth token
+   */
+  @Nullable
+  public static String toBasicAuthToken(String name, String password) {
+    if (StringUtils.isBlank(name)) {
+      return null;
+    }
+    String identifier = String.format("%s:%s", name, password);
+    return normalizeBase64Token(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+  }
+
+  /**
+   * Normalize a base64 encoded auth token by stripping redundant padding (spaces, '=')
+   *
+   * @param token base64 encoded auth token
+   * @return normalized auth token
+   */
+  @Nullable
+  public static String normalizeBase64Token(String token) {
+    if (token == null) {
+      return null;
+    }
+    return StringUtils.remove(token.trim(), '=');
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
index e490de2..ff4f66f 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -284,13 +284,15 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
     taskSpec.setRecordReaderSpec(recordReaderSpec);
 
+    String authToken = taskConfigs.get(BatchConfigProperties.AUTH_TOKEN); // TODO
+
     String tableNameWithType = taskConfigs.get(BatchConfigProperties.TABLE_NAME);
     Schema schema;
     if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
       schema = JsonUtils
           .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
     } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
-      schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+      schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI), authToken);
     } else {
       schema = getSchema(tableNameWithType);
     }
@@ -299,7 +301,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) {
       tableConfig = JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class);
     } else if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) {
-      tableConfig = SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI));
+      tableConfig =
+          SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI), authToken);
     } else {
       tableConfig = getTableConfig(tableNameWithType);
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 4c3b41b..d15cfcd 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -119,7 +120,9 @@ public class SegmentPushUtils implements Serializable {
           try (InputStream inputStream = fileSystem.open(tarFileURI)) {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                 .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream,
-                    tableName);
+                    FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList(
+                        new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)),
+                    FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                 controllerURI, response.getStatusCode(), response.getResponse());
             return true;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index 8b71584..c9622ba 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -67,7 +67,8 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
   private File _localTempDir;
 
   @Override
-  public void setup(Context context) throws IOException {
+  public void setup(Context context)
+      throws IOException {
     _jobConf = context.getConfiguration();
     Yaml yaml = new Yaml();
     String segmentGenerationJobSpecStr = _jobConf.get(SEGMENT_GENERATION_JOB_SPEC);
@@ -96,7 +97,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
     } else {
       LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", localPluginsTarFile.getAbsolutePath());
     }
-    
+
     // Register file systems
     List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
     for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
@@ -146,8 +147,9 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
       taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
       taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
       taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-      taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-      taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+      taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+      taskSpec.setTableConfig(
+          SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
       taskSpec.setSequenceId(idx);
       taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
       taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
@@ -186,7 +188,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
           .resolve(segmentTarFileName);
       LOGGER.info("Copying segment tar file from [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
       outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-      
+
       FileUtils.deleteQuietly(localSegmentDir);
       FileUtils.deleteQuietly(localSegmentTarFile);
       FileUtils.deleteQuietly(localInputDataFile);
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..d2273e7 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,13 +18,6 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
-import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
-import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -40,11 +33,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -65,6 +57,13 @@ import org.apache.spark.api.java.function.VoidFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
+import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName;
+import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME;
+import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
+
 
 public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {
 
@@ -302,8 +301,10 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
           taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
           taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
           taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-          taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-          taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
+          taskSpec
+              .setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
+          taskSpec.setTableConfig(
+              SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
           taskSpec.setSequenceId(idx);
           taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
           taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..52cf3de 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -27,14 +27,13 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -165,8 +164,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       FileUtils.forceMkdir(localOutputTempDir);
 
       //Read TableConfig, Schema
-      Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
-      TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
+      Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken());
+      TableConfig tableConfig =
+          SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken());
 
       int numInputFiles = filteredFiles.size();
       CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index deb3a6a..162ba71 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -70,6 +70,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   private HelixManager _helixManager;
   private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private String _authToken;
 
   @Override
   public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5a28884..0e3f1c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -49,6 +49,7 @@ public class BatchConfigProperties {
   public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
   public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
   public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+  public static final String AUTH_TOKEN = "authToken";
 
   public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 2e86199..35efe27 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -118,7 +118,7 @@ public class BootstrapTableTool {
     return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
         .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol)
         .setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true)
-        .execute();
+        .setAuthToken(_token).execute();
   }
 
   private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
@@ -178,6 +178,8 @@ public class BootstrapTableTool {
                 tlsSpec.getTrustStorePath(), tlsSpec.getTrustStorePassword());
           }
 
+          spec.setAuthToken(_token);
+
           IngestionJobLauncher.runIngestionJob(spec);
         }
       } else {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
index 30d4c28..1debd42 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
@@ -31,10 +31,13 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
 
+import javax.annotation.Nullable;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.tools.AbstractBaseCommand;
 import org.apache.pinot.tools.utils.PinotConfigUtils;
 
@@ -83,10 +86,9 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
     final URL url = new URL(urlString);
     final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
 
-    conn.setDoOutput(true);
+    headers.forEach(header -> conn.setRequestProperty(header.getName(), header.getValue()));
     conn.setRequestMethod(requestMethod);
-    headers.stream().flatMap(header -> Arrays.stream(header.getElements()))
-            .forEach(elem -> conn.setRequestProperty(elem.getName(), elem.getValue()));
+    conn.setDoOutput(true);
     if (payload != null) {
       final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),
           StandardCharsets.UTF_8));
@@ -123,16 +125,35 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
     return PinotConfigUtils.readConfigFromFile(configFileName);
   }
 
-  static List<Header> makeBasicAuth(String user, String password) {
-    if (StringUtils.isBlank(user)) {
-      return Collections.emptyList();
+  /**
+   * Generate an (optional) HTTP Authorization header given an auth token
+   * @see FileUploadDownloadClient#makeAuthHeader(String)
+   *
+   * @param authToken auth token
+   * @return list of 0 or 1 "Authorization" headers
+   */
+  static List<Header> makeAuthHeader(String authToken) {
+    return FileUploadDownloadClient.makeAuthHeader(authToken);
+  }
+
+  /**
+   * Generate auth token from pass-thru token or generate basic auth from user/password pair
+   *
+   * @param authToken optional pass-thru token
+   * @param user optional username
+   * @param password optional password
+   * @return auth token, or null if neither pass-thru token nor user info available
+   */
+  @Nullable
+  static String makeAuthToken(String authToken, String user, String password) {
+    if (StringUtils.isNotBlank(authToken)) {
+      return authToken;
     }
 
-    if (StringUtils.isBlank(password)) {
-      password = "";
+    if (StringUtils.isNotBlank(user)) {
+      return BasicAuthUtils.toBasicAuthToken(user, password);
     }
 
-    String token = "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
-    return Collections.singletonList(new BasicHeader("Authorization", token));
+    return null;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index 2bb0a99..fdf7a06 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -60,6 +60,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -80,10 +83,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
 
   @Override
   public String toString() {
-    String retString =
-        ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
-            + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
-                + "[hidden]");
+    String retString = ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+        + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
+        + "[hidden]");
 
     return ((_exec) ? (retString + " -exec") : retString);
   }
@@ -114,11 +116,15 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   }
 
   public void setUser(String user) {
-    this._user = user;
+    _user = user;
   }
 
   public void setPassword(String password) {
-    this._password = password;
+    _password = password;
+  }
+
+  public void setAuthToken(String authToken) {
+    _authToken = authToken;
   }
 
   public AddSchemaCommand setExecute(boolean exec) {
@@ -147,9 +153,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
 
     Schema schema = Schema.fromFile(schemaFile);
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      fileUploadDownloadClient.addSchema(
-          FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+      fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+              .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+          schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+          Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       return false;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index 78123e5..e8403fa 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.NetUtil;
@@ -66,6 +65,9 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -91,7 +93,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     String retString =
         ("AddTable -tableConfigFile " + _tableConfigFile + " -schemaFile " + _schemaFile + " -controllerProtocol "
             + _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort
-                + " -user " + _user + " -password " + "[hidden]");
+            + " -user " + _user + " -password " + "[hidden]");
     return ((_exec) ? (retString + " -exec") : retString);
   }
 
@@ -134,6 +136,11 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     return this;
   }
 
+  public AddTableCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public AddTableCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -151,9 +158,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
       throw e;
     }
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      fileUploadDownloadClient.addSchema(
-          FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
+      fileUploadDownloadClient.addSchema(FileUploadDownloadClient
+              .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
+          schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
+          Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       throw e;
@@ -163,7 +171,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   public boolean sendTableCreationRequest(JsonNode node)
       throws IOException {
     String res = AbstractBaseAdminCommand
-        .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString());
+        .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString(),
+            makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
     LOGGER.info(res);
     return res.contains("succesfully added");
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
index 3365314..efd4bf9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
@@ -67,6 +67,9 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -112,6 +115,11 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public AddTenantCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public AddTenantCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -137,7 +145,7 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount);
     String res = AbstractBaseAdminCommand
         .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
-            tenant.toJsonString(), makeBasicAuth(_user, _password));
+            tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
 
     LOGGER.info(res);
     System.out.print(res);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
index 6f7d017..12806c5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.BootstrapTableTool;
@@ -82,6 +84,9 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -122,7 +127,7 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
     if (_controllerHost == null) {
       _controllerHost = NetUtil.getHostAddress();
     }
-    String token = ""; // TODO
+    String token = makeAuthToken(_authToken, _user, _password);
     return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token).execute();
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
index d240e77..83ad5f8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
@@ -23,6 +23,7 @@ import java.net.URL;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpURL;
 import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
@@ -56,6 +57,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -75,8 +79,12 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
     URI uri = new URI(_controllerProtocol, null, _controllerHost, Integer.parseInt(_controllerPort),
         URI_TABLES_PATH + _tableName, "state=" + stateValue, null);
 
+    String token = makeAuthToken(_authToken, _user, _password);
+
     GetMethod httpGet = new GetMethod(uri.toString());
-    httpGet.setRequestHeader("Authorization", null); // TODO
+    if (StringUtils.isNotBlank(token)) {
+      httpGet.setRequestHeader("Authorization", token);
+    }
     int status = httpClient.executeMethod(httpGet);
     if (status != 200) {
       throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
index 49884e4..c432146 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -81,6 +81,9 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
   private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
 
@@ -243,11 +246,7 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
     spec.setCleanUpOutputDir(true);
     spec.setOverwriteOutput(true);
     spec.setJobType("SegmentCreationAndTarPush");
-
-    if (!StringUtils.isBlank(_user)) {
-      String token = ""; // TODO
-      spec.setAuthToken(token);
-    }
+    spec.setAuthToken(makeAuthToken(_authToken, _user, _password));
 
     // set ExecutionFrameworkSpec
     ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
index 97b04c6..27d3520 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
@@ -19,12 +19,11 @@
 package org.apache.pinot.tools.admin.command;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Iterator;
-import org.apache.commons.io.IOUtils;
+import java.util.List;
 import org.apache.commons.lang.StringUtils;
+import org.apache.http.Header;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -52,6 +51,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-config", metaVar = "<string>", usage = "Cluster config to operate.")
   private String _config;
 
@@ -73,8 +75,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
 
   @Override
   public String toString() {
-    String toString = "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost "
-        + _controllerHost + " -controllerPort " + _controllerPort + " -operation " + _operation;
+    String toString =
+        "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
+            + " -controllerPort " + _controllerPort + " -operation " + _operation;
     if (_config != null) {
       toString += " -config " + _config;
     }
@@ -116,6 +119,11 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
     return this;
   }
 
+  public OperateClusterConfigCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public OperateClusterConfigCommand setConfig(String config) {
     _config = config;
     return this;
@@ -135,7 +143,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
     if (StringUtils.isEmpty(_config) && !_operation.equalsIgnoreCase("GET")) {
       throw new UnsupportedOperationException("Empty config: " + _config);
     }
-    String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+    String clusterConfigUrl =
+        _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
+    List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user, _password));
     switch (_operation.toUpperCase()) {
       case "ADD":
       case "UPDATE":
@@ -145,10 +155,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
               "Bad config: " + _config + ". Please follow the pattern of [Config Key]=[Config Value]");
         }
         String request = JsonUtils.objectToString(Collections.singletonMap(splits[0], splits[1]));
-        return sendRequest("POST", clusterConfigUrl, request, makeBasicAuth(_user, _password));
+        return sendRequest("POST", clusterConfigUrl, request, headers);
       case "GET":
-        // TODO
-        String response = IOUtils.toString(new URI(clusterConfigUrl), StandardCharsets.UTF_8);
+        String response = sendRequest("GET", clusterConfigUrl, null, headers);
         JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
         Iterator<String> fieldNamesIterator = jsonNode.fieldNames();
         String results = "";
@@ -159,8 +168,7 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
         }
         return results;
       case "DELETE":
-        return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null,
-            makeBasicAuth(_user, _password));
+        return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null, headers);
       default:
         throw new UnsupportedOperationException("Unsupported operation: " + _operation);
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index 4afb686..72c16e4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -21,8 +21,8 @@ package org.apache.pinot.tools.admin.command;
 import java.util.Collections;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -53,6 +53,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -68,8 +71,8 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
 
   @Override
   public String toString() {
-    return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort " +
-        _brokerPort + " -queryType " + _queryType + " -query " + _query);
+    return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort "
+        + _brokerPort + " -queryType " + _queryType + " -query " + _query);
   }
 
   @Override
@@ -107,6 +110,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public PostQueryCommand setauthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public PostQueryCommand setQueryType(String queryType) {
     _queryType = queryType;
     return this;
@@ -133,7 +141,7 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
       request = JsonUtils.objectToString(Collections.singletonMap(Request.PQL, _query));
     }
 
-    return sendRequest("POST", urlString, request, makeBasicAuth(_user, _password));
+    return sendRequest("POST", urlString, request, makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
   }
 
   @Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 3630602..db53c31 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.auth.BasicAuthUtils;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -202,9 +203,10 @@ public class QuickstartRunner {
 
   public void bootstrapTable()
       throws Exception {
+    String token = BasicAuthUtils.toBasicAuthToken("admin", "verysecret");
     for (QuickstartTableRequest request : _tableRequests) {
       if (!new BootstrapTableTool("http", InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0),
-          request.getBootstrapTableDir(), null).execute()) {
+          request.getBootstrapTableDir(), token).execute()) {
         throw new RuntimeException("Failed to bootstrap table with request - " + request);
       }
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index d393e57..679a2a6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -58,6 +58,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
   @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
   private String _password;
 
+  @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.")
+  private String _authToken;
+
   @Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.")
   private String _segmentDir = null;
 
@@ -119,6 +122,11 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
     return this;
   }
 
+  public UploadSegmentCommand setAuthToken(String authToken) {
+    _authToken = authToken;
+    return this;
+  }
+
   public UploadSegmentCommand setSegmentDir(String segmentDir) {
     _segmentDir = segmentDir;
     return this;
@@ -159,7 +167,7 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
         LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
         fileUploadDownloadClient
             .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile,
-                makeBasicAuth(_user, _password), Collections.singletonList(new BasicNameValuePair(
+                makeAuthHeader(makeAuthToken(_authToken, _user, _password)), Collections.singletonList(new BasicNameValuePair(
                     FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)),
                 FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
       }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 0354fd9..7a17845 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -76,6 +76,7 @@ public class PinotConfigUtils {
     properties.put("controller.admin.access.control.principals.user.password", "secret");
     properties.put("controller.admin.access.control.principals.user.tables", "baseballStats");
     properties.put("controller.admin.access.control.principals.user.permissions", "read");
+    properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -176,6 +177,7 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir);
+    properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -188,6 +190,7 @@ public class PinotConfigUtils {
     Map<String, Object> properties = new HashMap<>();
     properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost);
     properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort());
+    properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/04: progress

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 29c3cc0be45ca18d4dd0a530d41bf309da8f02d3
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Tue Feb 9 14:43:31 2021 -0800

    progress
---
 .../apache/pinot/controller/ControllerStarter.java |   1 +
 .../api/access/AccessControlFactory.java           |   4 +
 .../api/access/BasicAuthAccessControlFactory.java  | 163 +++++++++++++++++++++
 .../batch/spec/SegmentGenerationJobSpec.java       |  13 ++
 .../org/apache/pinot/tools/BootstrapTableTool.java |  14 +-
 .../admin/command/AbstractBaseAdminCommand.java    |  25 +++-
 .../tools/admin/command/AddSchemaCommand.java      |  22 ++-
 .../pinot/tools/admin/command/AddTableCommand.java |  23 ++-
 .../tools/admin/command/AddTenantCommand.java      |  22 ++-
 .../tools/admin/command/BootstrapTableCommand.java |   9 +-
 .../tools/admin/command/ChangeTableState.java      |  10 +-
 .../tools/admin/command/ImportDataCommand.java     |  26 +++-
 .../admin/command/OperateClusterConfigCommand.java |  22 ++-
 .../tools/admin/command/PostQueryCommand.java      |  18 ++-
 .../tools/admin/command/QuickstartRunner.java      |   4 +-
 .../tools/admin/command/UploadSegmentCommand.java  |  23 ++-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |   6 +
 17 files changed, 377 insertions(+), 28 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 233f85e..a07ca29 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -388,6 +388,7 @@ public class ControllerStarter implements ServiceStartable {
     final AccessControlFactory accessControlFactory;
     try {
       accessControlFactory = (AccessControlFactory) Class.forName(accessControlFactoryClass).newInstance();
+      accessControlFactory.init(_config);
     } catch (Exception e) {
       throw new RuntimeException("Caught exception while creating new AccessControlFactory instance", e);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControlFactory.java
index 5b3d8d0..2c79784 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControlFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControlFactory.java
@@ -20,11 +20,15 @@ package org.apache.pinot.controller.api.access;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface AccessControlFactory {
+  default void init(PinotConfiguration pinotConfiguration) {
+    // left blank
+  }
 
   AccessControl create();
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
new file mode 100644
index 0000000..0b2c00a
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.access;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.util.*;
+import java.util.stream.Collectors;
+
+
+/**
+ * Basic Authentication based on http headers. Configured via the "controller.admin.access.control" family of properties.
+ *
+ * <pre>
+ *     Example:
+ *     controller.admin.access.control.principals=admin123,user456
+ *     controller.admin.access.control.principals.admin123.password=verysecret
+ *     controller.admin.access.control.principals.user456.password=kindasecret
+ *     controller.admin.access.control.principals.user456.tables=stuff,lessImportantStuff
+ *     controller.admin.access.control.principals.user456.permissions=read,update
+ * </pre>
+ */
+public class BasicAuthAccessControlFactory implements AccessControlFactory {
+  private static final String PREFIX = "controller.admin.access.control.principals";
+  private static final String PASSWORD = "password";
+  private static final String PERMISSIONS = "permissions";
+  private static final String TABLES = "tables";
+  private static final String ALL = "*";
+
+  private static final String HEADER_AUTHORIZATION = "Authorization";
+
+  private AccessControl _accessControl;
+
+  public void init(PinotConfiguration configuration) {
+    String principalNames = configuration.getProperty(PREFIX);
+    Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals");
+
+    List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> {
+      String name = rawName.trim();
+      Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name);
+
+      String password = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PASSWORD));
+      Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name);
+
+      Set<String> tables = new HashSet<>();
+      String tableNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, TABLES));
+      if (StringUtils.isNotBlank(tableNames) && !ALL.equals(tableNames)) {
+        tables = Arrays.stream(tableNames.split(",")).map(String::trim).collect(Collectors.toSet());
+      }
+
+      Set<AccessType> permissions = new HashSet<>();
+      String permissionNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PERMISSIONS));
+      if (StringUtils.isNotBlank(permissionNames) && !ALL.equals(tableNames)) {
+        permissions = Arrays.stream(permissionNames.split(",")).map(String::trim).map(String::toUpperCase)
+                .map(AccessType::valueOf).collect(Collectors.toSet());
+      }
+
+      return new BasicAuthPrincipal(name, toToken(name, password), tables, permissions);
+    }).collect(Collectors.toList());
+
+    _accessControl = new BasicAuthAccessControl(principals);
+  }
+
+  @Override
+  public AccessControl create() {
+    return _accessControl;
+  }
+
+  /**
+   * Access Control using header-based basic http authentication
+   */
+  private static class BasicAuthAccessControl implements AccessControl {
+    private final Map<String, BasicAuthPrincipal> _principals;
+
+    public BasicAuthAccessControl(Collection<BasicAuthPrincipal> principals) {
+      this._principals = principals.stream().collect(Collectors.toMap(BasicAuthPrincipal::getToken, p -> p));
+    }
+
+    @Override
+    public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) {
+      return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent();
+    }
+
+    @Override
+    public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
+      return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(accessType)).isPresent();
+    }
+
+    @Override
+    public boolean hasAccess(AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) {
+      return getPrincipal(httpHeaders).isPresent();
+    }
+
+    private Optional<BasicAuthPrincipal> getPrincipal(HttpHeaders headers) {
+      return headers.getRequestHeader(HEADER_AUTHORIZATION).stream().map(BasicAuthAccessControlFactory::normalizeToken)
+              .map(_principals::get).filter(Objects::nonNull).findFirst();
+    }
+  }
+
+  /**
+   * Container object for basic auth principal
+   */
+  private static class BasicAuthPrincipal {
+    private final String _name;
+    private final String _token;
+    private final Set<String> _tables;
+    private final Set<AccessType> _permissions;
+
+    public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<AccessType> permissions) {
+      this._name = name;
+      this._token = token;
+      this._tables = tables;
+      this._permissions = permissions;
+    }
+
+    public String getName() {
+      return _name;
+    }
+
+    public String getToken() {
+      return _token;
+    }
+
+    public boolean hasTable(String tableName) {
+      return _tables.isEmpty() || _tables.contains(tableName);
+    }
+
+    public boolean hasPermission(AccessType accessType) {
+      return _permissions.isEmpty() || _permissions.contains(accessType);
+    }
+  }
+
+  private static String toToken(String name, String password) {
+    String identifier = String.format("%s:%s", name, password);
+    return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes())));
+  }
+
+  private static String normalizeToken(String token) {
+    if (token == null) {
+      return null;
+    }
+    return token.trim().replace("=", "");
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 3c43db8..6a6ffbd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -121,6 +121,11 @@ public class SegmentGenerationJobSpec implements Serializable {
    */
   private TlsSpec _tlsSpec;
 
+  /**
+   * Controller auth token
+   */
+  private String _authToken;
+
   public ExecutionFrameworkSpec getExecutionFrameworkSpec() {
     return _executionFrameworkSpec;
   }
@@ -273,6 +278,14 @@ public class SegmentGenerationJobSpec implements Serializable {
   public void setTlsSpec(TlsSpec tlsSpec) {
     _tlsSpec = tlsSpec;
   }
+
+  public String getAuthToken() {
+    return _authToken;
+  }
+
+  public void setAuthToken(String authToken) {
+    _authToken = authToken;
+  }
 }
 
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index d3eb44f..2e86199 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -55,20 +55,11 @@ public class BootstrapTableTool {
   private final String _controllerProtocol;
   private final String _controllerHost;
   private final int _controllerPort;
+  private final String _token;
   private final String _tableDir;
   private final MinionClient _minionClient;
 
-  public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
-    Preconditions.checkNotNull(controllerHost);
-    Preconditions.checkNotNull(tableDir);
-    _controllerProtocol = CommonConstants.HTTP_PROTOCOL;
-    _controllerHost = controllerHost;
-    _controllerPort = controllerPort;
-    _tableDir = tableDir;
-    _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort));
-  }
-
-  public BootstrapTableTool(String controllerProtocol, String controllerHost, int controllerPort, String tableDir) {
+  public BootstrapTableTool(String controllerProtocol, String controllerHost, int controllerPort, String tableDir, String token) {
     Preconditions.checkNotNull(controllerProtocol);
     Preconditions.checkNotNull(controllerHost);
     Preconditions.checkNotNull(tableDir);
@@ -77,6 +68,7 @@ public class BootstrapTableTool {
     _controllerPort = controllerPort;
     _tableDir = tableDir;
     _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort));
+    _token = token;
   }
 
   public boolean execute()
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
index 9ab2f1a..30d4c28 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
@@ -29,9 +29,12 @@ import java.io.OutputStreamWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
 import org.apache.pinot.tools.AbstractBaseCommand;
 import org.apache.pinot.tools.utils.PinotConfigUtils;
 
@@ -72,11 +75,18 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
 
   public static String sendRequest(String requestMethod, String urlString, String payload)
       throws IOException {
+    return sendRequest(requestMethod, urlString, payload, Collections.emptyList());
+  }
+
+  public static String sendRequest(String requestMethod, String urlString, String payload, List<Header> headers)
+      throws IOException {
     final URL url = new URL(urlString);
     final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
 
     conn.setDoOutput(true);
     conn.setRequestMethod(requestMethod);
+    headers.stream().flatMap(header -> Arrays.stream(header.getElements()))
+            .forEach(elem -> conn.setRequestProperty(elem.getName(), elem.getValue()));
     if (payload != null) {
       final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),
           StandardCharsets.UTF_8));
@@ -112,4 +122,17 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand {
       throws ConfigurationException {
     return PinotConfigUtils.readConfigFromFile(configFileName);
   }
+
+  static List<Header> makeBasicAuth(String user, String password) {
+    if (StringUtils.isBlank(user)) {
+      return Collections.emptyList();
+    }
+
+    if (StringUtils.isBlank(password)) {
+      password = "";
+    }
+
+    String token = "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
+    return Collections.singletonList(new BasicHeader("Authorization", token));
+  }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index 8178bfc..2bb0a99 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -22,6 +22,9 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.data.Schema;
@@ -51,6 +54,12 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-exec", required = false, metaVar = "<boolean>", usage = "Execute the command.")
   private boolean _exec;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -73,7 +82,8 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
   public String toString() {
     String retString =
         ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
-            + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile);
+            + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password "
+                + "[hidden]");
 
     return ((_exec) ? (retString + " -exec") : retString);
   }
@@ -103,6 +113,14 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public void setUser(String user) {
+    this._user = user;
+  }
+
+  public void setPassword(String password) {
+    this._password = password;
+  }
+
   public AddSchemaCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -131,7 +149,7 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
       fileUploadDownloadClient.addSchema(
           FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile);
+          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       return false;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index 038883f..78123e5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -21,6 +21,8 @@ package org.apache.pinot.tools.admin.command;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.NetUtil;
@@ -58,6 +60,12 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   @Option(name = "-exec", required = false, metaVar = "<boolean>", usage = "Execute the command.")
   private boolean _exec;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -82,7 +90,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
   public String toString() {
     String retString =
         ("AddTable -tableConfigFile " + _tableConfigFile + " -schemaFile " + _schemaFile + " -controllerProtocol "
-            + _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort);
+            + _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort
+                + " -user " + _user + " -password " + "[hidden]");
     return ((_exec) ? (retString + " -exec") : retString);
   }
 
@@ -115,6 +124,16 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     return this;
   }
 
+  public AddTableCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public AddTableCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public AddTableCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -134,7 +153,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
       fileUploadDownloadClient.addSchema(
           FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)),
-          schema.getSchemaName(), schemaFile);
+          schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList());
     } catch (Exception e) {
       LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
       throw e;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
index dc38614..3365314 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
@@ -28,6 +28,8 @@ import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+
 
 public class AddTenantCommand extends AbstractBaseAdminCommand implements Command {
   private static final Logger LOGGER = LoggerFactory.getLogger(AddTenantCommand.class);
@@ -59,6 +61,12 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-exec", required = false, metaVar = "<boolean>", usage = "Execute the command.")
   private boolean _exec;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -94,6 +102,16 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public AddTenantCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public AddTenantCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public AddTenantCommand setExecute(boolean exec) {
     _exec = exec;
     return this;
@@ -118,8 +136,8 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman
     LOGGER.info("Executing command: " + toString());
     Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount);
     String res = AbstractBaseAdminCommand
-        .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
-            tenant.toJsonString());
+        .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
+            tenant.toJsonString(), makeBasicAuth(_user, _password));
 
     LOGGER.info(res);
     System.out.print(res);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
index 627755d..6f7d017 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -76,6 +76,12 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
   @Option(name = "-dir", required = false, aliases = {"-d", "-directory"}, metaVar = "<String>", usage = "The directory contains all the configs and data to bootstrap a table")
   private String _dir;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -116,6 +122,7 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C
     if (_controllerHost == null) {
       _controllerHost = NetUtil.getHostAddress();
     }
-    return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir).execute();
+    String token = ""; // TODO
+    return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token).execute();
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
index c7cbe70..d240e77 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
@@ -50,6 +50,12 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-state", required = true, metaVar = "<String>", usage = "Change Table State(enable|disable|drop)")
   private String _state;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -70,6 +76,7 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
         URI_TABLES_PATH + _tableName, "state=" + stateValue, null);
 
     GetMethod httpGet = new GetMethod(uri.toString());
+    httpGet.setRequestHeader("Authorization", null); // TODO
     int status = httpClient.executeMethod(httpGet);
     if (status != 200) {
       throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString());
@@ -94,7 +101,8 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman
   @Override
   public String toString() {
     return ("ChangeTableState -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost
-        + " -controllerPort " + _controllerPort + " -tableName" + _tableName + " -state" + _state);
+        + " -controllerPort " + _controllerPort + " -tableName" + _tableName + " -state" + _state + " -user " + _user
+        + " -password " + "[hidden]");
   }
 
   @Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
index 10b2aaf..49884e4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -74,6 +75,12 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
   @Option(name = "-controllerURI", metaVar = "<string>", usage = "Pinot Controller URI.")
   private String _controllerURI = "http://localhost:9000";
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
   private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
 
@@ -121,6 +128,16 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
     return this;
   }
 
+  public ImportDataCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public ImportDataCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public List<String> getAdditionalConfigs() {
     return _additionalConfigs;
   }
@@ -153,8 +170,8 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
   @Override
   public String toString() {
     String results = String
-        .format("InsertData -dataFilePath %s -format %s -table %s -controllerURI %s -tempDir %s", _dataFilePath,
-            _format, _table, _controllerURI, _tempDir);
+        .format("InsertData -dataFilePath %s -format %s -table %s -controllerURI %s -user %s -password %s -tempDir %s",
+            _dataFilePath, _format, _table, _controllerURI, _user, "[hidden]", _tempDir);
     if (_additionalConfigs != null) {
       results += " -additionalConfigs " + Arrays.toString(_additionalConfigs.toArray());
     }
@@ -227,6 +244,11 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma
     spec.setOverwriteOutput(true);
     spec.setJobType("SegmentCreationAndTarPush");
 
+    if (!StringUtils.isBlank(_user)) {
+      String token = ""; // TODO
+      spec.setAuthToken(token);
+    }
+
     // set ExecutionFrameworkSpec
     ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
     executionFrameworkSpec.setName("standalone");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
index f265d95..97b04c6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
@@ -46,6 +46,12 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
   @Option(name = "-controllerProtocol", required = false, metaVar = "<String>", usage = "protocol for controller.")
   private String _controllerProtocol = CommonConstants.HTTP_PROTOCOL;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-config", metaVar = "<string>", usage = "Cluster config to operate.")
   private String _config;
 
@@ -100,6 +106,16 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
     return this;
   }
 
+  public OperateClusterConfigCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public OperateClusterConfigCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public OperateClusterConfigCommand setConfig(String config) {
     _config = config;
     return this;
@@ -129,8 +145,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
               "Bad config: " + _config + ". Please follow the pattern of [Config Key]=[Config Value]");
         }
         String request = JsonUtils.objectToString(Collections.singletonMap(splits[0], splits[1]));
-        return sendPostRequest(clusterConfigUrl, request);
+        return sendRequest("POST", clusterConfigUrl, request, makeBasicAuth(_user, _password));
       case "GET":
+        // TODO
         String response = IOUtils.toString(new URI(clusterConfigUrl), StandardCharsets.UTF_8);
         JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
         Iterator<String> fieldNamesIterator = jsonNode.fieldNames();
@@ -142,7 +159,8 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem
         }
         return results;
       case "DELETE":
-        return sendDeleteRequest(String.format("%s/%s", clusterConfigUrl, _config), null);
+        return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null,
+            makeBasicAuth(_user, _password));
       default:
         throw new UnsupportedOperationException("Unsupported operation: " + _operation);
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index 511d7e7..4afb686 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -47,6 +47,12 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
   @Option(name = "-query", required = true, metaVar = "<string>", usage = "Query string to perform.")
   private String _query;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -91,6 +97,16 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public PostQueryCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public PostQueryCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public PostQueryCommand setQueryType(String queryType) {
     _queryType = queryType;
     return this;
@@ -117,7 +133,7 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
       request = JsonUtils.objectToString(Collections.singletonMap(Request.PQL, _query));
     }
 
-    return sendPostRequest(urlString, request);
+    return sendRequest("POST", urlString, request, makeBasicAuth(_user, _password));
   }
 
   @Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 4598d8a..3630602 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -203,8 +203,8 @@ public class QuickstartRunner {
   public void bootstrapTable()
       throws Exception {
     for (QuickstartTableRequest request : _tableRequests) {
-      if (!new BootstrapTableTool(InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0), request.getBootstrapTableDir())
-          .execute()) {
+      if (!new BootstrapTableTool("http", InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0),
+          request.getBootstrapTableDir(), null).execute()) {
         throw new RuntimeException("Failed to bootstrap table with request - " + request);
       }
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index a9e2267..d393e57 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -21,7 +21,9 @@ package org.apache.pinot.tools.admin.command;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.net.URI;
+import java.util.Collections;
 import org.apache.commons.io.FileUtils;
+import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.NetUtil;
@@ -50,6 +52,12 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
   @Option(name = "-controllerProtocol", required = false, metaVar = "<String>", usage = "protocol for controller.")
   private String _controllerProtocol = CommonConstants.HTTP_PROTOCOL;
 
+  @Option(name = "-user", required = false, metaVar = "<String>", usage = "Username for basic auth.")
+  private String _user;
+
+  @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.")
+  private String _password;
+
   @Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.")
   private String _segmentDir = null;
 
@@ -101,6 +109,16 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
     return this;
   }
 
+  public UploadSegmentCommand setUser(String user) {
+    _user = user;
+    return this;
+  }
+
+  public UploadSegmentCommand setPassword(String password) {
+    _password = password;
+    return this;
+  }
+
   public UploadSegmentCommand setSegmentDir(String segmentDir) {
     _segmentDir = segmentDir;
     return this;
@@ -140,7 +158,10 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co
 
         LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
         fileUploadDownloadClient
-            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile,
+                makeBasicAuth(_user, _password), Collections.singletonList(new BasicNameValuePair(
+                    FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)),
+                FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
       }
     } finally {
       // Delete the temporary working directory.
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index b636b90..0354fd9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -70,6 +70,12 @@ public class PinotConfigUtils {
     properties.put(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS, 3600);
     properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS, 3600);
     properties.put(ControllerConf.CONTROLLER_MODE, controllerMode.toString());
+    properties.put("controller.admin.access.control.factory.class", "org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory");
+    properties.put("controller.admin.access.control.principals", "admin, user");
+    properties.put("controller.admin.access.control.principals.admin.password", "verysecret");
+    properties.put("controller.admin.access.control.principals.user.password", "secret");
+    properties.put("controller.admin.access.control.principals.user.tables", "baseballStats");
+    properties.put("controller.admin.access.control.principals.user.permissions", "read");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 04/04: refactor FileUploadDownloadClient

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 58d047ae0278ae710583f31d08ce17aff08ed069
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Fri Feb 19 12:04:48 2021 -0800

    refactor FileUploadDownloadClient
---
 .../apache/pinot/common/utils/CommonConstants.java |   6 ++
 .../common/utils/FileUploadDownloadClient.java     | 102 +++++----------------
 .../apache/pinot/core/common/MinionConstants.java  |   1 +
 .../core/data/manager/BaseTableDataManager.java    |   2 +
 .../manager/config/InstanceDataManagerConfig.java  |   2 +
 .../manager/config/TableDataManagerConfig.java     |  10 +-
 .../manager/realtime/SegmentCommitterFactory.java  |   3 +-
 .../realtime/Server2ControllerSegmentUploader.java |  10 +-
 .../ServerSegmentCompletionProtocolHandler.java    |  28 ++++--
 .../Server2ControllerSegmentUploaderTest.java      |   4 +-
 .../org/apache/pinot/minion/MinionContext.java     |   9 ++
 .../org/apache/pinot/minion/MinionStarter.java     |   3 +
 .../BaseMultipleSegmentsConversionExecutor.java    |   6 +-
 .../BaseSingleSegmentConversionExecutor.java       |   7 +-
 .../minion/taskfactory/TaskFactoryRegistry.java    |   4 +
 .../ingestion/batch/common/SegmentPushUtils.java   |  28 +++---
 .../ingestion/common/DefaultControllerRestApi.java |   4 +-
 .../starter/helix/HelixInstanceDataManager.java    |   3 +-
 .../helix/HelixInstanceDataManagerConfig.java      |   8 ++
 .../pinot/tools/backfill/BackfillSegmentUtils.java |   3 +
 .../apache/pinot/tools/utils/PinotConfigUtils.java |   3 +
 21 files changed, 130 insertions(+), 116 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 191ae93..ffa91a2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -235,6 +235,9 @@ public class CommonConstants {
         "pinot.server.instance.realtime.alloc.offheap.direct";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter";
+
+    public static final String CONFIG_OF_AUTH_TOKEN = "auth.token";
+
     // Configuration to consider the server ServiceStatus as being STARTED if the percent of resources (tables) that
     // are ONLINE for this this server has crossed the threshold percentage of the total number of tables
     // that it is expected to serve.
@@ -304,6 +307,7 @@ public class CommonConstants {
       public static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = "enabled";
       public static final String CONFIG_OF_CONTROLLER_HTTPS_PORT = "controller.port";
       public static final String CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = "upload.request.timeout.ms";
+      public static final String CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN = "upload.auth.token";
 
       public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 300_000;
       public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000;
@@ -353,6 +357,8 @@ public class CommonConstants {
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = "segment.fetcher";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "segment.uploader";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter";
+
+    public static final String CONFIG_OF_TASK_AUTH_TOKEN = "task.auth.token";
   }
 
   public static class Segment {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index b4da975..f7ff600 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -46,7 +46,6 @@ import org.apache.http.StatusLine;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
 import org.apache.http.entity.ContentType;
@@ -140,11 +139,13 @@ public class FileUploadDownloadClient implements Closeable {
     return new URI(protocol, null, host, port, path, null, null);
   }
 
+  @Deprecated
   public static URI getRetrieveTableConfigHttpURI(String host, int port, String rawTableName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  @Deprecated
   public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
       String tableType)
       throws URISyntaxException {
@@ -152,6 +153,7 @@ public class FileUploadDownloadClient implements Closeable {
         rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
   }
 
+  @Deprecated
   public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName,
       String tableType)
       throws URISyntaxException {
@@ -159,6 +161,7 @@ public class FileUploadDownloadClient implements Closeable {
         rawTableName + TYPE_DELIMITER + tableType));
   }
 
+  @Deprecated
   public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -169,16 +172,6 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(protocol, host, port, SCHEMA_PATH + "/" + schemaName);
   }
 
-  public static URI getUploadSchemaHttpURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTP, host, port, SCHEMA_PATH);
-  }
-
-  public static URI getUploadSchemaHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, SCHEMA_PATH);
-  }
-
   public static URI getUploadSchemaURI(String protocol, String host, int port)
       throws URISyntaxException {
     return getURI(protocol, host, port, SCHEMA_PATH);
@@ -189,36 +182,12 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SCHEMA_PATH);
   }
 
-  /**
-   * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call
-   * getUploadSegmentHttpURI to construct your request.
-   */
-  @Deprecated
-  public static URI getOldUploadSegmentHttpURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTP, host, port, OLD_SEGMENT_PATH);
-  }
-
-  /**
-   * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call
-   * getUploadSegmentHttpsURI to construct your request.
-   */
   @Deprecated
-  public static URI getOldUploadSegmentHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, OLD_SEGMENT_PATH);
-  }
-
   public static URI getUploadSegmentHttpURI(String host, int port)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SEGMENT_PATH);
   }
 
-  public static URI getUploadSegmentHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, SEGMENT_PATH);
-  }
-
   public static URI getUploadSegmentURI(String protocol, String host, int port)
       throws URISyntaxException {
     return getURI(protocol, host, port, SEGMENT_PATH);
@@ -257,12 +226,14 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
+  @Deprecated
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  @Deprecated
   private static HttpUriRequest constructDeleteRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
@@ -275,11 +246,6 @@ public class FileUploadDownloadClient implements Closeable {
         DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
-  private static HttpUriRequest getUpdateSchemaRequest(URI uri, String schemaName, File schemaFile) {
-    return getUploadFileRequest(HttpPut.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
-        DEFAULT_SOCKET_TIMEOUT_MS);
-  }
-
   private static HttpUriRequest getUploadSegmentRequest(URI uri, String segmentName, File segmentFile,
       @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(segmentName, segmentFile), headers,
@@ -299,7 +265,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, Map<String, File> metadataFiles,
-      int segmentUploadRequestTimeoutMs) {
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int segmentUploadRequestTimeoutMs) {
     MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create().
         setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
     for (Map.Entry<String, File> entry : metadataFiles.entrySet()) {
@@ -310,6 +276,7 @@ public class FileUploadDownloadClient implements Closeable {
     // Build the POST request.
     RequestBuilder requestBuilder =
         RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
     setTimeout(requestBuilder, segmentUploadRequestTimeoutMs);
     return requestBuilder.build();
   }
@@ -334,16 +301,13 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, int socketTimeoutMs) {
+  private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, @Nullable List<Header> headers,
+      @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, socketTimeoutMs);
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) {
-    return getDownloadFileRequest(uri, socketTimeoutMs, null);
-  }
-
   private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     if (StringUtils.isNotBlank(authToken)) {
@@ -431,11 +395,13 @@ public class FileUploadDownloadClient implements Closeable {
     return errorMessage;
   }
 
+  @Deprecated
   public SimpleHttpResponse sendGetRequest(URI uri)
       throws IOException, HttpErrorStatusException {
     return sendRequest(constructGetRequest(uri));
   }
 
+  @Deprecated
   public SimpleHttpResponse sendDeleteRequest(URI uri)
       throws IOException, HttpErrorStatusException {
     return sendRequest(constructDeleteRequest(uri));
@@ -481,9 +447,10 @@ public class FileUploadDownloadClient implements Closeable {
 
   // Upload a set of segment metadata files (e.g., meta.properties and creation.meta) to controllers.
   public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, File> metadataFiles,
-      int segmentUploadRequestTimeoutMs)
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int segmentUploadRequestTimeoutMs)
       throws IOException, HttpErrorStatusException {
-    return sendRequest(getUploadSegmentMetadataFilesRequest(uri, metadataFiles, segmentUploadRequestTimeoutMs));
+    return sendRequest(
+        getUploadSegmentMetadataFilesRequest(uri, metadataFiles, headers, parameters, segmentUploadRequestTimeoutMs));
   }
 
   /**
@@ -513,6 +480,8 @@ public class FileUploadDownloadClient implements Closeable {
   /**
    * Upload segment with segment file using default settings. Include table name as a request parameter.
    *
+   * NOTE: does not support auth tokens
+   *
    * @param uri URI
    * @param segmentName Segment name
    * @param segmentFile Segment file
@@ -521,6 +490,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName)
       throws IOException, HttpErrorStatusException {
     // Add table name as a request parameter
@@ -580,6 +550,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse sendSegmentUri(URI uri, String downloadUri, String rawTableName)
       throws IOException, HttpErrorStatusException {
     // Add table name as a request parameter
@@ -615,6 +586,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString)
       throws IOException, HttpErrorStatusException {
     return sendSegmentJson(uri, jsonString, null, null, DEFAULT_SOCKET_TIMEOUT_MS);
@@ -629,24 +601,10 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, int socketTimeoutMs)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getSegmentCompletionProtocolRequest(uri, socketTimeoutMs));
-  }
-
-  /**
-   * Download a file using default settings.
-   *
-   * @param uri URI
-   * @param socketTimeoutMs Socket timeout in milliseconds
-   * @param dest File destination
-   * @return Response status code
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public int downloadFile(URI uri, int socketTimeoutMs, File dest)
+  public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, @Nullable List<Header> headers,
+      @Nullable List<NameValuePair> parameters, int socketTimeoutMs)
       throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, socketTimeoutMs, dest, null);
+    return sendRequest(getSegmentCompletionProtocolRequest(uri, headers, parameters, socketTimeoutMs));
   }
 
   /**
@@ -690,20 +648,6 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Download a file.
-   *
-   * @param uri URI
-   * @param dest File destination
-   * @return Response status code
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public int downloadFile(URI uri, File dest)
-      throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest);
-  }
-
-  /**
    * Download a file, with an optional auth token.
    *
    * @param uri URI
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index d9d1a9d..c2ce030 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -29,6 +29,7 @@ public class MinionConstants {
   public static final String DOWNLOAD_URL_KEY = "downloadURL";
   public static final String UPLOAD_URL_KEY = "uploadURL";
   public static final String URL_SEPARATOR = ",";
+  public static final String AUTH_TOKEN = "authToken";
 
   /**
    * When minion downloads a segment to do work on, we will save that CRC. We will send that to the controller in an
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1381e61..a1c1fc3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -55,6 +55,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
   protected File _indexDir;
   protected Logger _logger;
   protected HelixManager _helixManager;
+  protected String _authToken;
 
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
@@ -66,6 +67,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
     _propertyStore = propertyStore;
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
+    _authToken = tableDataManagerConfig.getAuthToken();
 
     _tableNameWithType = tableDataManagerConfig.getTableName();
     _tableDataDir = tableDataManagerConfig.getDataDir();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index 6422140..cf2aeb9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -50,4 +50,6 @@ public interface InstanceDataManagerConfig {
   boolean isDirectRealtimeOffHeapAllocation();
 
   int getMaxParallelSegmentBuilds();
+
+  String getAuthToken();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
index ca0de52..577d795 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -36,6 +37,7 @@ public class TableDataManagerConfig {
   private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY = "consumerDirectory";
   private static final String TABLE_DATA_MANAGER_NAME = "name";
   private static final String TABLE_IS_DIMENSION = "isDimTable";
+  private static final String TABLE_DATA_MANGER_AUTH_TOKEN = "authToken";
 
   private final Configuration _tableDataManagerConfig;
 
@@ -67,6 +69,10 @@ public class TableDataManagerConfig {
     return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION);
   }
 
+  public String getAuthToken() {
+    return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN);
+  }
+
   public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
       @Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull String tableNameWithType) {
     Configuration defaultConfig = new PropertiesConfiguration();
@@ -77,14 +83,16 @@ public class TableDataManagerConfig {
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     Preconditions.checkNotNull(tableType);
     defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name());
+    defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, instanceDataManagerConfig.getAuthToken());
 
     return new TableDataManagerConfig(defaultConfig);
   }
 
-  public void overrideConfigs(@Nonnull TableConfig tableConfig) {
+  public void overrideConfigs(@Nonnull TableConfig tableConfig, String authToken) {
     // Override table level configs
 
     _tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION, tableConfig.isDimTable());
+    _tableDataManagerConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, authToken);
 
     // If we wish to override some table level configs using table config, override them here
     // Note: the configs in TableDataManagerConfig is immutable once the table is created, which mean it will not pick
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 2d8154e..9cb5704 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -63,7 +63,8 @@ public class SegmentCommitterFactory {
 
     segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(),
         _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(),
-        ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
+        ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics,
+        _protocolHandler.getAuthToken());
     return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 35084aa..ac46ee8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -39,10 +39,11 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
   private final String _segmentName;
   private final int _segmentUploadRequestTimeoutMs;
   private final ServerMetrics _serverMetrics;
+  private final String _authToken;
 
   public Server2ControllerSegmentUploader(Logger segmentLogger, FileUploadDownloadClient fileUploadDownloadClient,
       String controllerSegmentUploadCommitUrl, String segmentName, int segmentUploadRequestTimeoutMs,
-      ServerMetrics serverMetrics)
+      ServerMetrics serverMetrics, String authToken)
       throws URISyntaxException {
     _segmentLogger = segmentLogger;
     _fileUploadDownloadClient = fileUploadDownloadClient;
@@ -50,10 +51,11 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
     _segmentName = segmentName;
     _segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs;
     _serverMetrics = serverMetrics;
+    _authToken = authToken;
   }
 
   @Override
-  public URI uploadSegment(File segmentFile,  LLCSegmentName segmentName) {
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
     SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile);
     if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
       try {
@@ -69,8 +71,8 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile, null, null,
-              _segmentUploadRequestTimeoutMs).getResponse();
+          .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile,
+              FileUploadDownloadClient.makeAuthHeader(_authToken), null, _segmentUploadRequestTimeoutMs).getResponse();
       response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       _segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl);
       if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index c419105..5f24486 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol.*;
+
 
 /**
  * A class that handles sending segment completion protocol requests to the controller and getting
@@ -58,6 +60,7 @@ public class ServerSegmentCompletionProtocolHandler {
   private static SSLContext _sslContext;
   private static Integer _controllerHttpsPort;
   private static int _segmentUploadRequestTimeoutMs;
+  private static String _authToken;
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
@@ -69,8 +72,9 @@ public class ServerSegmentCompletionProtocolHandler {
       _sslContext = new ClientSSLContextGenerator(httpsConfig.subset(CommonConstants.PREFIX_OF_SSL_SUBSET)).generate();
       _controllerHttpsPort = httpsConfig.getProperty(CONFIG_OF_CONTROLLER_HTTPS_PORT, Integer.class);
     }
-    _segmentUploadRequestTimeoutMs =
-        uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
+    _segmentUploadRequestTimeoutMs = uploaderConfig
+        .getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
+    _authToken = uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN);
   }
 
   public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
@@ -87,6 +91,10 @@ public class ServerSegmentCompletionProtocolHandler {
     return _fileUploadDownloadClient;
   }
 
+  public String getAuthToken() {
+    return _authToken;
+  }
+
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
     SegmentCompletionProtocol.SegmentCommitStartRequest request =
         new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
@@ -146,10 +154,11 @@ public class ServerSegmentCompletionProtocolHandler {
       return SegmentCompletionProtocol.RESP_NOT_SENT;
     }
 
-    Server2ControllerSegmentUploader segmentUploader= null;
+    Server2ControllerSegmentUploader segmentUploader = null;
     try {
-      segmentUploader = new Server2ControllerSegmentUploader(LOGGER,
-          _fileUploadDownloadClient, url, params.getSegmentName(), _segmentUploadRequestTimeoutMs, _serverMetrics);
+      segmentUploader =
+          new Server2ControllerSegmentUploader(LOGGER, _fileUploadDownloadClient, url, params.getSegmentName(),
+              _segmentUploadRequestTimeoutMs, _serverMetrics, _authToken);
     } catch (URISyntaxException e) {
       LOGGER.error("Segment commit upload url error: ", e);
       return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -203,9 +212,9 @@ public class ServerSegmentCompletionProtocolHandler {
   private SegmentCompletionProtocol.Response sendRequest(String url) {
     SegmentCompletionProtocol.Response response;
     try {
-      String responseStr =
-          _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new URI(url), DEFAULT_OTHER_REQUESTS_TIMEOUT)
-              .getResponse();
+      String responseStr = _fileUploadDownloadClient
+          .sendSegmentCompletionProtocolRequest(new URI(url), FileUploadDownloadClient.makeAuthHeader(_authToken), null,
+              DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse();
       response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), url);
       if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
@@ -228,7 +237,8 @@ public class ServerSegmentCompletionProtocolHandler {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, _segmentUploadRequestTimeoutMs).getResponse();
+          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, FileUploadDownloadClient.makeAuthHeader(_authToken),
+              null, _segmentUploadRequestTimeoutMs).getResponse();
       response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), url);
       if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 85f84f1..86a7088 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -91,7 +91,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class));
+            10000, mock(ServerMetrics.class), null);
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
   }
@@ -101,7 +101,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class));
+            10000, mock(ServerMetrics.class), null);
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
index 35e58fa..6ba1d67 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
@@ -45,6 +45,7 @@ public class MinionContext {
 
   // For segment upload
   private SSLContext _sslContext;
+  private String _taskAuthToken;
 
   // For PurgeTask
   private SegmentPurger.RecordPurgerFactory _recordPurgerFactory;
@@ -97,4 +98,12 @@ public class MinionContext {
   public void setRecordModifierFactory(SegmentPurger.RecordModifierFactory recordModifierFactory) {
     _recordModifierFactory = recordModifierFactory;
   }
+
+  public String getTaskAuthToken() {
+    return _taskAuthToken;
+  }
+
+  public void setTaskAuthToken(String taskAuthToken) {
+    _taskAuthToken = taskAuthToken;
+  }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 015b8e2..37aa51d 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -160,6 +160,9 @@ public class MinionStarter implements ServiceStartable {
     minionMetrics.initializeGlobalMeters();
     minionContext.setMinionMetrics(minionMetrics);
 
+    // initialize authentication
+    minionContext.setTaskAuthToken(_config.getProperty(CommonConstants.Minion.CONFIG_OF_TASK_AUTH_TOKEN));
+
     // Start all components
     LOGGER.info("Initializing PinotFSFactory");
     PinotConfiguration pinotFSConfig = _config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index ee7be9e..f9e7ebe 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -88,6 +88,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
+    String authToken = configs.get(MinionConstants.AUTH_TOKEN);
 
     LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
         tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
@@ -149,8 +150,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             TableNameBuilder.extractRawTableName(tableNameWithType));
         List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
 
-        SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL,
-            convertedTarredSegmentFile);
+        SegmentConversionUtils
+            .uploadSegment(configs, FileUploadDownloadClient.makeAuthHeader(authToken), parameters, tableNameWithType,
+                resultSegmentName, uploadURL, convertedTarredSegmentFile);
       }
 
       String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
index 0b94a23..4f9d779 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.minion.executor;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,7 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
     String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     String originalSegmentCrc = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
+    String authToken = configs.get(MinionConstants.AUTH_TOKEN);
 
     LOGGER.info("Start executing {} on table: {}, segment: {} with downloadURL: {}, uploadURL: {}", taskType,
         tableNameWithType, segmentName, downloadURL, uploadURL);
@@ -121,7 +123,10 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
           new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
               segmentZKMetadataCustomMapModifier.toJsonString());
 
-      List<Header> httpHeaders = Arrays.asList(ifMatchHeader, segmentZKMetadataCustomMapModifierHeader);
+      List<Header> httpHeaders = new ArrayList<>();
+      httpHeaders.add(ifMatchHeader);
+      httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+      httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
 
       // Set parameters for upload request.
       NameValuePair enableParallelPushProtectionParameter =
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index ce5c840..b8adc80 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -25,6 +25,7 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
@@ -71,6 +72,9 @@ public class TaskFactoryRegistry {
               MinionMetrics minionMetrics = MinionContext.getInstance().getMinionMetrics();
 
               PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(_taskConfig);
+              pinotTaskConfig.getConfigs()
+                  .put(MinionConstants.AUTH_TOKEN, MinionContext.getInstance().getTaskAuthToken());
+
               _eventObserver.notifyTaskStart(pinotTaskConfig);
               minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_EXECUTED, 1L);
               LOGGER.info("Start running {}: {} with configs: {}", pinotTaskConfig.getTaskType(), _taskConfig.getId(),
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 5cc69ba..43efce6 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.plugin.ingestion.batch.common;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -33,9 +33,7 @@ import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
-import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
@@ -177,7 +175,10 @@ public class SegmentPushUtils implements Serializable {
         RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
           try {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-                .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri, tableName);
+                .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri,
+                    FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()),
+                    FileUploadDownloadClient.makeTableParam(tableName),
+                    FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, segmentUri,
                 controllerURI, response.getStatusCode(), response.getResponse());
             return true;
@@ -248,17 +249,18 @@ public class SegmentPushUtils implements Serializable {
           }
           RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
             try {
-              List<Header> headers = ImmutableList
-                  .of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath),
-                      new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-                          FileUploadDownloadClient.FileUploadType.METADATA.toString()));
-              // Add table name as a request parameter
-              NameValuePair tableNameValuePair =
-                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
-              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              List<Header> headers = new ArrayList<>();
+              headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+                  FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+              if (StringUtils.isNotBlank(spec.getAuthToken())) {
+                headers.addAll(FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()));
+              }
+
               SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                   .uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName,
-                      segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+                      segmentMetadataFile, headers, FileUploadDownloadClient.makeTableParam(tableName),
+                      FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
               LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                   controllerURI, response.getStatusCode(), response.getResponse());
               return true;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
index 391d33b..43db07d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -118,8 +117,7 @@ public class DefaultControllerRestApi implements ControllerRestApi {
           try (InputStream inputStream = fileSystem.open(tarFilePath)) {
             SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
                 FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
-                segmentName, inputStream, Collections.emptyList(),
-                FileUploadDownloadClient.makeTableParam(_rawTableName),
+                segmentName, inputStream, null, FileUploadDownloadClient.makeTableParam(_rawTableName),
                 FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
             break;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 162ba71..7cfc19b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -82,6 +82,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     _instanceId = _instanceDataManagerConfig.getInstanceId();
     _helixManager = helixManager;
     _serverMetrics = serverMetrics;
+    _authToken = config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN);
 
     File instanceDataDir = new File(_instanceDataManagerConfig.getInstanceDataDir());
     if (!instanceDataDir.exists()) {
@@ -138,7 +139,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     LOGGER.info("Creating table data manager for table: {}", tableNameWithType);
     TableDataManagerConfig tableDataManagerConfig =
         TableDataManagerConfig.getDefaultHelixTableDataManagerConfig(_instanceDataManagerConfig, tableNameWithType);
-    tableDataManagerConfig.overrideConfigs(tableConfig);
+    tableDataManagerConfig.overrideConfigs(tableConfig, _authToken);
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore, _serverMetrics, _helixManager);
     tableDataManager.start();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 41c2744..0e4f60f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix;
 import java.util.Optional;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.http.auth.AUTH;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
@@ -60,6 +61,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
   public static final String SEGMENT_FORMAT_VERSION = "segment.format.version";
   // Key of whether to enable reloading consuming segments
   public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = "reload.consumingSegment";
+  // Key of the auth token
+  public static final String AUTH_TOKEN = "auth.token";
 
   // Key of how many parallel realtime segments can be built.
   // A value of <= 0 indicates unlimited.
@@ -195,6 +198,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
   }
 
   @Override
+  public String getAuthToken() {
+    return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN);
+  }
+
+  @Override
   public String toString() {
     String configString = "";
     configString += "Instance Id: " + getInstanceId();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
index 7d230ea..254a7be 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
@@ -152,7 +152,10 @@ public class BackfillSegmentUtils {
 
   /**
    * Uploads the segment tar to the controller.
+   *
+   * NOTE: this method does not support auth tokens
    */
+  @Deprecated
   public boolean uploadSegment(String rawTableName, String segmentName, File segmentDir, File outputDir) {
     boolean success = true;
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 17338af..047faca 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -178,6 +178,8 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir);
     properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("pinot.server.segment.upload.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("pinot.server.instance.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -191,6 +193,7 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost);
     properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort());
     properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("task.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 03/04: transitioning more calls for auth support

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 67e04c89d13937e1a594ccd14950fda8807affe7
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Tue Feb 16 16:23:37 2021 -0800

    transitioning more calls for auth support
---
 .../common/utils/FileUploadDownloadClient.java     | 62 +++++-----------------
 .../resources/PinotIngestionRestletResource.java   | 22 ++++++--
 .../pinot/controller/util/FileIngestionHelper.java | 16 +++---
 .../pinot/controller/util/FileIngestionUtils.java  | 16 ++----
 .../apache/pinot/core/auth/BasicAuthPrincipal.java | 18 +++++++
 .../org/apache/pinot/core/auth/BasicAuthUtils.java | 19 ++++++-
 .../ingestion/batch/common/SegmentPushUtils.java   |  5 +-
 .../ingestion/common/DefaultControllerRestApi.java |  8 ++-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |  2 +-
 9 files changed, 89 insertions(+), 79 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 401b036..b4da975 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -447,21 +447,6 @@ public class FileUploadDownloadClient implements Closeable {
    * @param uri URI
    * @param schemaName Schema name
    * @param schemaFile Schema file
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse addSchema(URI uri, String schemaName, File schemaFile)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getAddSchemaRequest(uri, schemaName, schemaFile, null, null));
-  }
-
-  /**
-   * Add schema.
-   *
-   * @param uri URI
-   * @param schemaName Schema name
-   * @param schemaFile Schema file
    * @param headers HTTP headers
    * @param parameters HTTP parameters
    * @return Response
@@ -475,21 +460,6 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Update schema.
-   *
-   * @param uri URI
-   * @param schemaName Schema name
-   * @param schemaFile Schema file
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse updateSchema(URI uri, String schemaName, File schemaFile)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getUpdateSchemaRequest(uri, schemaName, schemaFile));
-  }
-
-  /**
    * Upload segment by sending a zip of creation.meta and metadata.properties.
    *
    * @param uri URI
@@ -581,25 +551,6 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Upload segment with segment file input stream using default settings. Include table name as a request parameter.
-   *
-   * @param uri URI
-   * @param segmentName Segment name
-   * @param inputStream Segment file input stream
-   * @param rawTableName Raw table name
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream inputStream, String rawTableName)
-      throws IOException, HttpErrorStatusException {
-    // Add table name as a request parameter
-    NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName);
-    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
-    return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
-  }
-
-  /**
    * Send segment uri.
    *
    * Note: table name has to be set as a parameter.
@@ -783,7 +734,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Generate an (optional) HTTP Authorization header given an auth token
+   * Generate an (optional) HTTP Authorization header given an auth token.
    *
    * @param authToken auth token
    * @return list of 0 or 1 "Authorization" headers
@@ -794,4 +745,15 @@ public class FileUploadDownloadClient implements Closeable {
     }
     return Collections.singletonList(new BasicHeader("Authorization", authToken));
   }
+
+  /**
+   * Generate a param list with a table name attribute.
+   *
+   * @param tableName table name
+   * @return param list
+   */
+  public static List<NameValuePair> makeTableParam(String tableName) {
+    return Collections
+        .singletonList(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName));
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index f37a251..0695db5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
@@ -36,6 +37,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
@@ -116,8 +118,7 @@ public class PinotIngestionRestletResource {
       + "\n Example usage (query params need encoding):" + "\n```"
       + "\ncurl -X POST -F file=@data.json -H \"Content-Type: multipart/form-data\" \"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&"
       + "\nbatchConfigMapStr={" + "\n  \"inputFormat\":\"csv\"," + "\n  \"recordReader.prop.delimiter\":\"|\""
-      + "\n}\" "
-      + "\n```")
+      + "\n}\" " + "\n```")
   public void ingestFromFile(
       @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableNameWithType") String tableNameWithType,
       @ApiParam(value = "Batch config Map as json string. Must pass inputFormat, and optionally record reader properties. e.g. {\"inputFormat\":\"json\"}", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
@@ -191,8 +192,21 @@ public class PinotIngestionRestletResource {
     Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
 
     FileIngestionHelper fileIngestionHelper =
-        new FileIngestionHelper(tableConfig, schema, batchConfig, _controllerConf.getControllerHost(),
-            Integer.parseInt(_controllerConf.getControllerPort()), new File(_controllerConf.getDataDir(), UPLOAD_DIR));
+        new FileIngestionHelper(tableConfig, schema, batchConfig, getControllerUri(),
+            new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthToken());
     return fileIngestionHelper.buildSegmentAndPush(payload);
   }
+
+  private String getAuthToken() {
+    return _controllerConf
+        .getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".auth.token");
+  }
+
+  private URI getControllerUri() {
+    try {
+      return new URI(_controllerConf.generateVipUrl());
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException("Controller VIP uri is invalid", e);
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 49521fe..d82f6a7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -50,18 +50,18 @@ public class FileIngestionHelper {
   private final TableConfig _tableConfig;
   private final Schema _schema;
   private final BatchConfig _batchConfig;
-  private final String _controllerHost;
-  private final int _controllerPort;
+  private final URI _controllerUri;
   private final File _uploadDir;
+  private final String _authToken;
 
-  public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, String controllerHost,
-      int controllerPort, File uploadDir) {
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri,
+      File uploadDir, String authToken) {
     _tableConfig = tableConfig;
     _schema = schema;
     _batchConfig = batchConfig;
-    _controllerHost = controllerHost;
-    _controllerPort = controllerPort;
+    _controllerUri = controllerUri;
     _uploadDir = uploadDir;
+    _authToken = authToken;
   }
 
   /**
@@ -106,8 +106,8 @@ public class FileIngestionHelper {
           new File(segmentTarDir, segmentName + org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
       TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), segmentTarFile);
       FileIngestionUtils
-          .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerHost, _controllerPort);
-      LOGGER.info("Uploaded tar: {} to {}:{}", segmentTarFile.getAbsolutePath(), _controllerHost, _controllerPort);
+          .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerUri, _authToken);
+      LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(), _controllerUri);
 
       return new SuccessResponse(
           "Successfully ingested file into table: " + tableNameWithType + " as segment: " + segmentName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
index 7346eb4..716c3bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.controller.util;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
@@ -28,8 +26,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.IOUtils;
@@ -42,13 +38,10 @@ import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -141,8 +134,7 @@ public final class FileIngestionUtils {
   /**
    * Uploads the segment tar files to the provided controller
    */
-  public static void uploadSegment(String tableNameWithType, List<File> tarFiles, String controllerHost,
-      int controllerPort)
+  public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, String authToken)
       throws RetriableOperationException, AttemptsExceededException {
     for (File tarFile : tarFiles) {
       String fileName = tarFile.getName();
@@ -154,8 +146,10 @@ public final class FileIngestionUtils {
       RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
         try (InputStream inputStream = new FileInputStream(tarFile)) {
           SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              .uploadSegment(FileUploadDownloadClient.getUploadSegmentHttpURI(controllerHost, controllerPort),
-                  segmentName, inputStream, tableNameWithType);
+              .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream,
+                  FileUploadDownloadClient.makeAuthHeader(authToken),
+                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
+                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
           LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName,
               response.getStatusCode(), response.getResponse());
           return true;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
index ae44401..8dd735e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.core.auth;
 
 import java.util.Set;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
index 46cfa6b..6a3cc39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
@@ -1,10 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.core.auth;
 
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index d15cfcd..5cc69ba 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -120,8 +119,8 @@ public class SegmentPushUtils implements Serializable {
           try (InputStream inputStream = fileSystem.open(tarFileURI)) {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                 .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream,
-                    FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList(
-                        new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)),
+                    FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()),
+                    FileUploadDownloadClient.makeTableParam(tableName),
                     FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                 controllerURI, response.getStatusCode(), response.getResponse());
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
index 2880482..391d33b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * Deprecated. Does not support HTTPS or authentication
+ */
 public class DefaultControllerRestApi implements ControllerRestApi {
   private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
 
@@ -114,7 +118,9 @@ public class DefaultControllerRestApi implements ControllerRestApi {
           try (InputStream inputStream = fileSystem.open(tarFilePath)) {
             SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
                 FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
-                segmentName, inputStream, _rawTableName);
+                segmentName, inputStream, Collections.emptyList(),
+                FileUploadDownloadClient.makeTableParam(_rawTableName),
+                FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
             break;
           } catch (Exception e) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 7a17845..17338af 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -76,7 +76,7 @@ public class PinotConfigUtils {
     properties.put("controller.admin.access.control.principals.user.password", "secret");
     properties.put("controller.admin.access.control.principals.user.tables", "baseballStats");
     properties.put("controller.admin.access.control.principals.user.permissions", "read");
-    properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org