You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/06/10 00:04:53 UTC

[pinot] branch master updated: Streamed segment download & untar with rate limiter to control disk usage (#8753)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 90fa0f9104 Streamed segment download & untar with rate limiter to control disk usage (#8753)
90fa0f9104 is described below

commit 90fa0f910449ef194b253a081ddd4cbe02710aa8
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Thu Jun 9 17:04:47 2022 -0700

    Streamed segment download & untar with rate limiter to control disk usage (#8753)
    
    * Add streamed download untar with ratelimit for segment download and refresh
    
    * address comments
    
    * address comments
    
    * address comments
    
    * address comments
    
    * address comments
    
    * address comments
    
    * address comments
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  2 +
 .../common/utils/FileUploadDownloadClient.java     | 20 ++++++
 .../pinot/common/utils/TarGzCompressionUtils.java  | 66 ++++++++++++++++++-
 .../common/utils/fetcher/BaseSegmentFetcher.java   |  5 ++
 .../common/utils/fetcher/HttpSegmentFetcher.java   | 54 +++++++++++++++
 .../pinot/common/utils/fetcher/SegmentFetcher.java |  6 ++
 .../utils/fetcher/SegmentFetcherFactory.java       | 19 ++++++
 .../apache/pinot/common/utils/http/HttpClient.java | 38 +++++++++++
 .../utils/fetcher/SegmentFetcherFactoryTest.java   |  8 +++
 .../core/data/manager/BaseTableDataManager.java    | 77 ++++++++++++++++++++--
 .../manager/offline/TableDataManagerProvider.java  |  7 +-
 .../BaseTableDataManagerAcquireSegmentTest.java    |  4 +-
 .../data/manager/BaseTableDataManagerTest.java     |  4 +-
 .../offline/DimensionTableDataManagerTest.java     |  4 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  6 ++
 .../executor/QueryExecutorExceptionsTest.java      |  7 ++
 .../core/query/executor/QueryExecutorTest.java     |  7 ++
 .../pinot/queries/ExplainPlanQueriesTest.java      |  7 ++
 .../queries/SegmentWithNullValueVectorTest.java    |  9 +++
 .../local/data/manager/TableDataManager.java       |  2 +-
 .../local/data/manager/TableDataManagerParams.java | 66 +++++++++++++++++++
 .../helix/HelixInstanceDataManagerConfig.java      | 25 +++++++
 .../apache/pinot/server/api/BaseResourceTest.java  |  3 +-
 .../config/instance/InstanceDataManagerConfig.java |  4 ++
 24 files changed, 435 insertions(+), 15 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 2bd5619688..19ae43f382 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -64,6 +64,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   RELOAD_FAILURES("segments", false),
   REFRESH_FAILURES("segments", false),
   UNTAR_FAILURES("segments", false),
+  SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES("segments", false),
+  SEGMENT_DIR_MOVEMENT_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FAILURES("segments", false),
   NUM_RESIZES("numResizes", false),
   NO_TABLE_ACCESS("tables", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index ac4413ff35..55931077b7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -1004,6 +1004,26 @@ public class FileUploadDownloadClient implements AutoCloseable {
     return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authProvider, httpHeaders);
   }
 
+  /**
+   * Download and untar a file in a streamed way with rate limit
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authProvider auth token
+   * @param httpHeaders http headers
+   * @param maxStreamRateInByte limit the rate to write download-untar stream to disk, in bytes
+   *                  -1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvider, List<Header> httpHeaders,
+      long maxStreamRateInByte)
+      throws IOException, HttpErrorStatusException {
+    return _httpClient.downloadUntarFileStreamed(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authProvider,
+        httpHeaders, maxStreamRateInByte);
+  }
+
   /**
    * Generate a param list with a table name attribute.
    *
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
index 8e5d590c56..c094b74a49 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
@@ -19,9 +19,12 @@
 package org.apache.pinot.common.utils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -44,6 +47,25 @@ import org.apache.commons.lang3.StringUtils;
  * Utility class to compress/de-compress tar.gz files.
  */
 public class TarGzCompressionUtils {
+  public static final long NO_DISK_WRITE_RATE_LIMIT = -1;
+  /* Don't limit write rate to disk. The OS will buffer multiple writes and can write up to several GBs
+   * at a time, which saturates disk bandwidth.
+   */
+  public static final long SYNC_DISK_WRITE_WITH_UPSTREAM_RATE = 0;
+  /* Match the upstream rate, but will do a file sync for each write of DEFAULT_BUFFER_SIZE
+   * to flush the buffer to disk. This avoids saturating disk I/O bandwidth.
+   */
+  private static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+  /* 4MB is large enough, page aligned, and multiple of SSD block size for efficient write:
+   * Common page sizes are 2K, 4K, 8K, or 16K, with 128 to 256 pages per block.
+   * Block size therefore typically varies between 256KB and 4MB.
+   * https://codecapsule.com/2014/02/12
+   * /coding-for-ssds-part-6-a-summary-what-every-programmer-should-know-about-solid-state-drives/
+   *
+   * It is also sufficient for HDDs
+   */
+
+
   private TarGzCompressionUtils() {
   }
 
@@ -123,6 +145,16 @@ public class TarGzCompressionUtils {
    */
   public static List<File> untar(InputStream inputStream, File outputDir)
       throws IOException {
+    return untarWithRateLimiter(inputStream, outputDir, NO_DISK_WRITE_RATE_LIMIT);
+  }
+
+  /**
+   * Un-tars an inputstream of a tar.gz file into a directory, returns all the untarred files/directories.
+   * RateLimit limits the untar rate
+   * <p>For security reason, the untarred files must reside in the output directory.
+   */
+  public static List<File> untarWithRateLimiter(InputStream inputStream, File outputDir, long maxStreamRateInByte)
+      throws IOException {
     String outputDirCanonicalPath = outputDir.getCanonicalPath();
     // Prevent partial path traversal
     if (!outputDirCanonicalPath.endsWith(File.separator)) {
@@ -163,8 +195,12 @@ public class TarGzCompressionUtils {
           if (!parentFile.isDirectory() && !parentFile.mkdirs()) {
             throw new IOException(String.format("Failed to create directory: %s", parentFile));
           }
-          try (OutputStream out = Files.newOutputStream(outputFile.toPath())) {
-            IOUtils.copy(tarGzIn, out);
+          try (FileOutputStream out = new FileOutputStream(outputFile.toPath().toString())) {
+            if (maxStreamRateInByte != NO_DISK_WRITE_RATE_LIMIT) {
+              copyWithRateLimiter(tarGzIn, out, maxStreamRateInByte);
+            } else {
+              IOUtils.copy(tarGzIn, out);
+            }
           }
         }
         untarredFiles.add(outputFile);
@@ -198,4 +234,30 @@ public class TarGzCompressionUtils {
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream,
+      long maxStreamRateInByte)
+      throws IOException {
+    Preconditions.checkState(inputStream != null, "inputStream is null");
+    Preconditions.checkState(outputStream != null, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    long count;
+    int n;
+
+    if (maxStreamRateInByte == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk bandwidth wouldn't get saturated
+      }
+    } else {
+      RateLimiter rateLimiter = RateLimiter.create(maxStreamRateInByte);
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        rateLimiter.acquire(n);
+        outputStream.write(buffer, 0, n);
+        fd.sync();
+      }
+    }
+    return count;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index d99c256233..2fadaffd0a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -102,6 +102,11 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
     });
   }
 
+  public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit)
+      throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Fetches a segment from URI location to local without retry. Sub-class should override this or
    * {@link #fetchSegmentToLocal(URI, File)}.
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index a8374eb835..741ac18315 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -20,9 +20,11 @@ package org.apache.pinot.common.utils.fetcher;
 
 import com.google.common.net.InetAddresses;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.http.Header;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -93,6 +95,58 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
     });
   }
 
+  @Override
+  public File fetchUntarSegmentToLocalStreamed(URI downloadURI, File dest, long maxStreamRateInByte)
+      throws Exception {
+    // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
+    // download from a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result.
+    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+    int retryCount = Math.max(_retryCount, uriProvider.numAddresses());
+    AtomicReference<File> ret = new AtomicReference<>(); // return the untared segment directory
+    _logger.info("Retry downloading for {} times. retryCount from pinot server config: {}, number of IP addresses for "
+        + "download URI: {}", retryCount, _retryCount, uriProvider.numAddresses());
+    RetryPolicies.exponentialBackoffRetryPolicy(retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+      URI uri = uriProvider.next();
+      try {
+        String hostName = downloadURI.getHost();
+        int port = downloadURI.getPort();
+        // If the original download address is specified as host name, need add a "HOST" HTTP header to the HTTP
+        // request. Otherwise, if the download address is a LB address, when the LB be configured as "disallow direct
+        // access by IP address", downloading will fail.
+        List<Header> httpHeaders = new LinkedList<>();
+        if (!InetAddresses.isInetAddress(hostName)) {
+          httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port));
+        }
+        ret.set(_httpClient.downloadUntarFileStreamed(uri, dest, _authProvider, httpHeaders, maxStreamRateInByte));
+
+        return true;
+      } catch (HttpErrorStatusException e) {
+        int statusCode = e.getStatusCode();
+        if (statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
+          // Temporary exception
+          // 404 is treated as a temporary exception, as the downloadURI may be backed by multiple hosts,
+          // if singe host is down, can retry with another host.
+          _logger.warn("Got temporary error status code: {} while downloading segment from: {} to: {}", statusCode, uri,
+              dest, e);
+          return false;
+        } else {
+          // Permanent exception
+          _logger.error("Got permanent error status code: {} while downloading segment from: {} to: {}, won't retry",
+              statusCode, uri, dest, e);
+          throw e;
+        }
+      } catch (IOException e) {
+        _logger.warn("Caught IOException while stream download-untarring segment from: {} to: {}, retrying",
+            uri, dest, e);
+        return false;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e);
+        return false;
+      }
+    });
+    return ret.get();
+  }
+
   @Override
   public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
       throws Exception {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
index d9efea346c..da4936f433 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
@@ -37,6 +37,12 @@ public interface SegmentFetcher {
   void fetchSegmentToLocal(URI uri, File dest)
       throws Exception;
 
+  /**
+   * Fetches a segment from URI location and untar to local in a streamed manner
+   */
+  File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit)
+      throws Exception;
+
   /**
    * Fetches a segment to local from any uri in the given list.
    */
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index f9835206d7..eb5b498729 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -153,6 +153,25 @@ public class SegmentFetcherFactory {
     getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
   }
 
+  /**
+   * Fetches a segment from URI location to local and untar it in a streamed manner.
+   * @param uri URI
+   * @param tempRootDir Tmp dir to download
+   * @param maxStreamRateInByte limit the rate to write download-untar stream to disk, in bytes
+   *                  -1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
+   * @return the untared directory
+   * @throws Exception
+   */
+  public static File fetchAndStreamUntarToLocal(String uri, File tempRootDir, long maxStreamRateInByte)
+      throws Exception {
+    return getInstance().fetchAndStreamUntarToLocalInternal(new URI(uri), tempRootDir, maxStreamRateInByte);
+  }
+
+  private File fetchAndStreamUntarToLocalInternal(URI uri, File tempRootDir, long maxStreamRateInByte)
+      throws Exception {
+    return getSegmentFetcher(uri.getScheme()).fetchUntarSegmentToLocalStreamed(uri, tempRootDir, maxStreamRateInByte);
+  }
+
   /**
    * Fetches a segment from a URI location to a local file and decrypts it if needed
    * @param uri remote segment location
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
index 79b050bf68..d0179c3950 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.SimpleHttpErrorInfo;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -394,6 +395,43 @@ public class HttpClient implements AutoCloseable {
     }
   }
 
+  /**
+   * Download and untar in a streamed manner a file using default settings, with an optional auth token
+   *
+   * @param uri URI
+   * @param socketTimeoutMs Socket timeout in milliseconds
+   * @param dest File destination
+   * @param authProvider auth provider
+   * @param httpHeaders http headers
+   * @param maxStreamRateInByte limit the rate to write download-untar stream to disk, in bytes
+   *                  -1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
+   * @return The untarred directory
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public File downloadUntarFileStreamed(URI uri, int socketTimeoutMs, File dest, AuthProvider authProvider,
+      List<Header> httpHeaders, long maxStreamRateInByte)
+      throws IOException, HttpErrorStatusException {
+    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authProvider, httpHeaders);
+    File ret;
+    try (CloseableHttpResponse response = _httpClient.execute(request)) {
+      StatusLine statusLine = response.getStatusLine();
+      int statusCode = statusLine.getStatusCode();
+      if (statusCode >= 300) {
+        throw new HttpErrorStatusException(HttpClient.getErrorMessage(request, response), statusCode);
+      }
+
+      try (InputStream inputStream = response.getEntity().getContent()) {
+        ret = TarGzCompressionUtils.untarWithRateLimiter(inputStream, dest, maxStreamRateInByte).get(0);
+      }
+
+      LOGGER.info("Downloaded from: {} to: {} with rate limiter; Response status code: {}", uri, dest,
+              statusCode);
+
+      return ret;
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Static utility for dealing with lower-level API responses.
   // --------------------------------------------------------------------------
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
index 54679a3889..845ab40e7c 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
@@ -114,6 +114,14 @@ public class SegmentFetcherFactoryTest {
       _fetchFileToLocalCalled++;
     }
 
+    @Override
+    public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit)
+        throws Exception {
+      assertEquals(uri, new URI(TEST_URI));
+      _fetchFileToLocalCalled++;
+      return new File("fakeSegmentIndexFile");
+    }
+
     @Override
     public void fetchSegmentToLocal(List<URI> uri, File dest)
         throws Exception {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index d4eb1f9175..67a7b7eacb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -53,6 +53,7 @@ import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -93,6 +94,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
   protected Logger _logger;
   protected HelixManager _helixManager;
   protected AuthProvider _authProvider;
+  protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
+  protected boolean _isStreamSegmentDownloadUntar;
 
   // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related
   // errors as the value.
@@ -101,7 +104,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
-      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads) {
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      TableDataManagerParams tableDataManagerParams) {
     LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
 
     _tableDataManagerConfig = tableDataManagerConfig;
@@ -130,6 +134,15 @@ public abstract class BaseTableDataManager implements TableDataManager {
           _resourceTmpDir);
     }
     _errorCache = errorCache;
+    _streamSegmentDownloadUntarRateLimitBytesPerSec =
+        tableDataManagerParams.getStreamSegmentDownloadUntarRateLimitBytesPerSec();
+    _isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar();
+    if (_isStreamSegmentDownloadUntar) {
+      LOGGER.info("Using streamed download-untar for segment download! "
+              + "The rate limit interval for streamed download-untar is {} ms",
+          _streamSegmentDownloadUntarRateLimitBytesPerSec);
+    }
+    int maxParallelSegmentDownloads = tableDataManagerParams.getMaxParallelSegmentDownloads();
     if (maxParallelSegmentDownloads > 0) {
       LOGGER.info(
           "Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}",
@@ -409,11 +422,35 @@ public abstract class BaseTableDataManager implements TableDataManager {
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
     FileUtils.forceMkdir(tempRootDir);
+    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
+      try {
+        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir,
+            _streamSegmentDownloadUntarRateLimitBytesPerSec);
+        return moveSegment(segmentName, untaredSegDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    } else {
+      try {
+        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    }
+  }
+
+  private File moveSegment(String segmentName, File untaredSegDir)
+      throws IOException {
     try {
-      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
-      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      File indexDir = getSegmentDataDir(segmentName);
+      FileUtils.deleteDirectory(indexDir);
+      FileUtils.moveDirectory(untaredSegDir, indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+      throw e;
     }
   }
 
@@ -447,6 +484,36 @@ public abstract class BaseTableDataManager implements TableDataManager {
     }
   }
 
+  private File downloadAndStreamUntarWithRateLimit(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir,
+      long maxStreamRateInByte)
+      throws Exception {
+    if (_segmentDownloadSemaphore != null) {
+      long startTime = System.currentTimeMillis();
+      LOGGER.info("Trying to acquire segment download semaphore for: {}. queue-length: {} ", segmentName,
+          _segmentDownloadSemaphore.getQueueLength());
+      _segmentDownloadSemaphore.acquire();
+      LOGGER.info("Acquired segment download semaphore for: {} (lock-time={}ms, queue-length={}).", segmentName,
+          System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength());
+    }
+    LOGGER.info("Trying to download segment {} using streamed download-untar with maxStreamRateInByte {}",
+        segmentName, maxStreamRateInByte);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri, tempRootDir, maxStreamRateInByte);
+      LOGGER.info("Download and untarred segment: {} for table: {} from: {}", segmentName, _tableNameWithType, uri);
+      return ret;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when stream download-untarring segment: {} for table: {} from: {} to: {}",
+          segmentName, _tableNameWithType, uri, tempRootDir);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, 1L);
+      throw e;
+    } finally {
+      if (_segmentDownloadSemaphore != null) {
+        _segmentDownloadSemaphore.release();
+      }
+    }
+  }
+
   @VisibleForTesting
   File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir)
       throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index dcd8c461ed..9eb1b11461 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableType;
 
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.config.table.TableType;
  */
 public class TableDataManagerProvider {
   private static Semaphore _segmentBuildSemaphore;
-  private static int _maxParallelSegmentDownloads;
+  private static TableDataManagerParams _tableDataManagerParams;
 
   private TableDataManagerProvider() {
   }
@@ -48,7 +49,7 @@ public class TableDataManagerProvider {
     if (maxParallelBuilds > 0) {
       _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
     }
-    _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
+    _tableDataManagerParams = new TableDataManagerParams(instanceDataManagerConfig);
   }
 
   public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
@@ -70,7 +71,7 @@ public class TableDataManagerProvider {
         throw new IllegalStateException();
     }
     tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache,
-        _maxParallelSegmentDownloads);
+        _tableDataManagerParams);
     return tableDataManager;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 27764d334a..627d36b2c6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -40,6 +40,7 @@ import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -122,7 +123,8 @@ public class BaseTableDataManagerAcquireSegmentTest {
       when(config.getAuthConfig()).thenReturn(new MapConfiguration(new HashMap<>()));
     }
     tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 7c14f73054..21cac335b8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -532,7 +533,8 @@ public class BaseTableDataManagerTest {
 
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return tableDataManager;
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 580ccee6cf..1dd207f4af 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -118,7 +119,8 @@ public class DimensionTableDataManagerTest {
       when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     }
     tableDataManager.init(config, "dummyInstance", helixManager.getHelixPropertyStore(),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, 0);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
+        new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return tableDataManager;
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 39baf93665..1e8d70d331 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -723,6 +723,12 @@ public class LLRealtimeSegmentDataManagerTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("REALTIME");
     when(tableDataManagerConfig.getTableName()).thenReturn(tableConfig.getTableName());
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
+    when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
+    when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
+    when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
+    TableDataManagerProvider.init(instanceDataManagerConfig);
 
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore,
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 4ea2607063..eb1e07a69e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
@@ -133,6 +134,12 @@ public class QueryExecutorExceptionsTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
+    when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
+    when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
+    when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
+    TableDataManagerProvider.init(instanceDataManagerConfig);
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance",
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index d75fae8544..89880795ea 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -43,6 +43,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
@@ -127,6 +128,12 @@ public class QueryExecutorTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
+    when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
+    when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
+    when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
+    TableDataManagerProvider.init(instanceDataManagerConfig);
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance",
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index 53f71d2528..9962105351 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -58,6 +58,7 @@ import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -264,6 +265,12 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(RAW_TABLE_NAME);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
+    when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
+    when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
+    when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
+    TableDataManagerProvider.init(instanceDataManagerConfig);
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance",
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index a9a5ed67b6..d529e951cf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -71,6 +72,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.segment.local.segment.index.creator.RawIndexCreatorTest.getRandomValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -141,6 +144,12 @@ public class SegmentWithNullValueVectorTest {
     Mockito.when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     Mockito.when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
     Mockito.when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
+    when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
+    when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
+    when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
+    TableDataManagerProvider.init(instanceDataManagerConfig);
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance",
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 80a0987103..7687433fbc 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -49,7 +49,7 @@ public interface TableDataManager {
    */
   void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
-      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads);
+      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, TableDataManagerParams tableDataManagerParams);
 
   /**
    * Starts the table data manager. Should be called only once after table data manager gets initialized but before
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
new file mode 100644
index 0000000000..f3a0589dd2
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream segment download-untar
+  private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per segment rate limit for stream download-untar
+  private int _maxParallelSegmentDownloads; // max number of segment download in parallel per table
+
+  public TableDataManagerParams(int maxParallelSegmentDownloads, boolean isStreamSegmentDownloadUntar,
+      long streamSegmentDownloadUntarRateLimitBytesPerSec) {
+    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
+    _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
+    _streamSegmentDownloadUntarRateLimitBytesPerSec = streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) {
+    _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
+    _isStreamSegmentDownloadUntar = instanceDataManagerConfig.isStreamSegmentDownloadUntar();
+    _streamSegmentDownloadUntarRateLimitBytesPerSec =
+        instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
+  }
+
+  public boolean isStreamSegmentDownloadUntar() {
+    return _isStreamSegmentDownloadUntar;
+  }
+
+  public long getStreamSegmentDownloadUntarRateLimitBytesPerSec() {
+    return _streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public void setStreamSegmentDownloadUntar(boolean streamSegmentDownloadUntar) {
+    _isStreamSegmentDownloadUntar = streamSegmentDownloadUntar;
+  }
+
+  public void setStreamSegmentDownloadUntarRateLimitBytesPerSec(long streamSegmentDownloadUntarRateLimitBytesPerSec) {
+    _streamSegmentDownloadUntarRateLimitBytesPerSec = streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public int getMaxParallelSegmentDownloads() {
+    return _maxParallelSegmentDownloads;
+  }
+
+  public void setMaxParallelSegmentDownloads(int maxParallelSegmentDownloads) {
+    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 0d7a2a213e..e38e2905b9 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -20,6 +20,7 @@ package org.apache.pinot.server.starter.helix;
 
 import java.util.Optional;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -79,6 +80,18 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
   private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = "table.level.max.parallel.segment.downloads";
   private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = -1;
 
+  // Key of server segment download rate limit
+  // limit the rate to write download-untar stream to disk, in bytes
+  // -1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
+  private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
+      = "segment.stream.download.untar.rate.limit.bytes.per.sec";
+  private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
+      = TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
+
+  // Key of whether to use streamed server segment download-untar
+  private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = "segment.stream.download.untar";
+  private static final boolean DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = false;
+
   // Key of whether to enable split commit
   private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit";
   // Key of whether to enable split commit end with segment metadata files.
@@ -232,6 +245,18 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
     return _instanceDataManagerConfiguration.getProperty(ERROR_CACHE_SIZE, DEFAULT_ERROR_CACHE_SIZE);
   }
 
+  @Override
+  public boolean isStreamSegmentDownloadUntar() {
+    return _instanceDataManagerConfiguration.getProperty(ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR,
+        DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR);
+  }
+
+  @Override
+  public long getStreamSegmentDownloadUntarRateLimit() {
+    return _instanceDataManagerConfiguration.getProperty(STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT,
+        DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT);
+  }
+
   @Override
   public String toString() {
     String configString = "";
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 9b38dfd63b..d9a88d504f 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -40,6 +40,7 @@ import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -193,7 +194,7 @@ public abstract class BaseResourceTest {
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager
         .init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
-            mock(HelixManager.class), null, 0);
+            mock(HelixManager.class), null, new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index f003d441f4..da4e39cb8a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -58,4 +58,8 @@ public interface InstanceDataManagerConfig {
   String getSegmentDirectoryLoader();
 
   long getErrorCacheSize();
+
+  boolean isStreamSegmentDownloadUntar();
+
+  long getStreamSegmentDownloadUntarRateLimit();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org