You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/25 20:59:31 UTC

[pinot] branch master updated: New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e02bdda  New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)
e02bdda is described below

commit e02bdda1501abb82e2efbc27674a74a6fb5519b5
Author: Tim Santos <ti...@cortexdata.io>
AuthorDate: Fri Mar 25 21:59:04 2022 +0100

    New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)
---
 .../etc/jmx_prometheus_javaagent/configs/pinot.yml | 15 ++++
 .../broker/broker/HelixBrokerStarterTest.java      |  2 +-
 .../common/metadata/segment/SegmentZKMetadata.java |  8 ++
 .../pinot/common/metrics/ControllerGauge.java      | 10 +++
 .../pinot/controller/BaseControllerStarter.java    |  3 +-
 .../apache/pinot/controller/ControllerConf.java    |  2 +-
 .../PinotSegmentUploadDownloadRestletResource.java | 21 +++--
 .../pinot/controller/api/upload/ZKOperator.java    | 20 +++--
 .../controller/helix/SegmentStatusChecker.java     | 35 +++++++-
 .../helix/core/PinotHelixResourceManager.java      | 30 ++++++-
 .../pinot/controller/util/TableSizeReader.java     | 39 +++++----
 .../controller/validation/StorageQuotaChecker.java | 14 +---
 .../pinot/controller/api/TableSizeReaderTest.java  | 68 +++++++++++-----
 .../controller/api/upload/ZKOperatorTest.java      | 16 ++--
 .../controller/helix/SegmentStatusCheckerTest.java | 95 +++++++++++++++++++---
 .../validation/StorageQuotaCheckerTest.java        |  7 +-
 .../validation/ValidationManagerTest.java          |  3 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 18 files changed, 297 insertions(+), 92 deletions(-)

diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index dcaff91..e784180 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -106,6 +106,21 @@ rules:
   cache: true
   labels:
     table: "$1"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableTotalSizeOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
+  name: "pinot_controller_tableTotalSizeOnServer_$3"
+  labels:
+    table: "$1"
+    tableType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableSizePerReplicaOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
+  name: "pinot_controller_tableSizePerReplicaOnServer_$3"
+  labels:
+    table: "$1"
+    tableType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableCompressedSize.(\\w+)_(\\w+)\"><>(\\w+)"
+  name: "pinot_controller_tableCompressedSize_$3"
+  labels:
+    table: "$1"
+    tableType: "$2"
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableQuota.(\\w+)_(\\w+)\"><>(\\w+)"
   name: "pinot_controller_tableQuota_$3"
   cache: true
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 3f7d59b..7f29c98 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -228,7 +228,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
         _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh);
     _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
         SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
-        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
+        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1);
 
     TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
         .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 13264a1..0b73af4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -125,6 +125,14 @@ public class SegmentZKMetadata implements ZKMetadata {
     setNonNegativeValue(Segment.TOTAL_DOCS, totalDocs);
   }
 
+  public void setSizeInBytes(long sizeInBytes) {
+    setNonNegativeValue(Segment.SIZE_IN_BYTES, sizeInBytes);
+  }
+
+  public long getSizeInBytes() {
+    return _znRecord.getLongField(Segment.SIZE_IN_BYTES, -1);
+  }
+
   public long getCrc() {
     return _znRecord.getLongField(Segment.CRC, -1);
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index ae33acc..7e908ff 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,8 +67,18 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
   CONTROLLER_LEADER_PARTITION_COUNT("ControllerLeaderPartitionCount", true),
 
   // Estimated size of offline table
+  @Deprecated // Instead use TABLE_TOTAL_SIZE_ON_SERVER
   OFFLINE_TABLE_ESTIMATED_SIZE("OfflineTableEstimatedSize", false),
 
+  // Total size of table across replicas on servers
+  TABLE_TOTAL_SIZE_ON_SERVER("TableTotalSizeOnServer", false),
+
+  // Size of table per replica on servers
+  TABLE_SIZE_PER_REPLICA_ON_SERVER("TableSizePerReplicaOnServer", false),
+
+  // Total size of compressed segments per table
+  TABLE_COMPRESSED_SIZE("TableCompressedSize", false),
+
   // Table quota based on setting in table config
   TABLE_QUOTA("TableQuotaBasedOnTableConfig", false),
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 6a3e9a5..94e68c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -646,7 +646,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
         new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
     periodicTasks.add(_segmentStatusChecker);
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
         _executorService);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index cbda36a..25514ab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -251,7 +251,7 @@ public class ControllerConf extends PinotConfiguration {
   private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M";
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
-  public static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";
+  private static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";
 
   public ControllerConf() {
     super(new HashMap<>());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 253a3fb..dfd3b54 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -214,17 +214,28 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader);
       FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr);
       File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
+      long segmentSizeInBytes;
       switch (uploadType) {
         case URI:
           downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
+          segmentSizeInBytes = dstFile.length();
           break;
         case SEGMENT:
           createSegmentFileFromMultipart(multiPart, dstFile);
+          segmentSizeInBytes = dstFile.length();
           break;
         case METADATA:
           moveSegmentToFinalLocation = false;
           Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode");
           createSegmentFileFromMultipart(multiPart, dstFile);
+          try {
+            URI segmentURI = new URI(downloadUri);
+            PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+            segmentSizeInBytes = pinotFS.length(segmentURI);
+          } catch (Exception e) {
+            segmentSizeInBytes = -1;
+            LOGGER.warn("Could not fetch segment size for metadata push", e);
+          }
           break;
         default:
           throw new UnsupportedOperationException("Unsupported upload type: " + uploadType);
@@ -302,7 +313,7 @@ public class PinotSegmentUploadDownloadRestletResource {
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh);
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh, segmentSizeInBytes);
 
       return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
     } catch (WebApplicationException e) {
@@ -397,15 +408,15 @@ public class PinotSegmentUploadDownloadRestletResource {
 
   private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile,
       String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh)
+      boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh, long segmentSizeInBytes)
       throws Exception {
     String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString();
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, URIUtils.encode(segmentName));
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
-    zkOperator
-        .completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile,
-            enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, allowRefresh);
+    zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI,
+        uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter,
+        allowRefresh, segmentSizeInBytes);
   }
 
   private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 693d42f..28f1753 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -61,7 +61,7 @@ public class ZKOperator {
   public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter,
-      boolean allowRefresh)
+      boolean allowRefresh, long segmentSizeInBytes)
       throws Exception {
     String segmentName = segmentMetadata.getName();
     ZNRecord segmentMetadataZNRecord =
@@ -76,7 +76,8 @@ public class ZKOperator {
       }
       LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers,
-          crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection);
+          crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection,
+          segmentSizeInBytes);
       return;
     }
 
@@ -101,13 +102,13 @@ public class ZKOperator {
     LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType);
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
         enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,
-        segmentMetadataZNRecord, moveSegmentToFinalLocation);
+        segmentMetadataZNRecord, moveSegmentToFinalLocation, segmentSizeInBytes);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
       String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
       throws Exception {
 
     SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord);
@@ -202,7 +203,7 @@ public class ZKOperator {
 
         _pinotHelixResourceManager
             .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion,
-                zkDownloadURI, crypter);
+                zkDownloadURI, crypter, segmentSizeInBytes);
       }
     } catch (Exception e) {
       if (!_pinotHelixResourceManager
@@ -234,10 +235,12 @@ public class ZKOperator {
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+      long segmentSizeInBytes)
       throws Exception {
-    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
-        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
+    SegmentZKMetadata newSegmentZKMetadata =
+        _pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI,
+            crypter, segmentSizeInBytes);
 
     // Lock if enableParallelPushProtection is true.
     if (enableParallelPushProtection) {
@@ -253,7 +256,6 @@ public class ZKOperator {
       newSegmentZKMetadata
           .setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
     }
-
     if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) {
       throw new RuntimeException(
           "Failed to create ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 3f2e378..0e69829 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -18,14 +18,18 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
@@ -36,6 +40,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -57,21 +62,28 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
   private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
   private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer();
 
+  private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;
+
   private final int _waitForPushTimeSeconds;
   private long _lastDisabledTableLogTimestamp = 0;
 
+  private TableSizeReader _tableSizeReader;
+
   /**
    * Constructs the segment status checker.
    * @param pinotHelixResourceManager The resource checker used to interact with Helix
    * @param config The controller configuration object
    */
   public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
-      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
         config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
+    _tableSizeReader = new TableSizeReader(executorService, new SimpleHttpConnectionManager(true), _controllerMetrics,
+        _pinotHelixResourceManager);
   }
 
   @Override
@@ -96,6 +108,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
   protected void processTable(String tableNameWithType, Context context) {
     try {
       updateSegmentMetrics(tableNameWithType, context);
+      updateTableSizeMetrics(tableNameWithType);
     } catch (Exception e) {
       LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
       // Remove the metric for this table
@@ -110,6 +123,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTableCount);
   }
 
+  private void updateTableSizeMetrics(String tableNameWithType)
+      throws InvalidConfigException {
+    _tableSizeReader.getTableSizeDetails(tableNameWithType, TABLE_CHECKER_TIMEOUT_MS);
+  }
+
   /**
    * Runs a segment status pass over the given table.
    * TODO: revisit the logic and reduce the ZK access
@@ -173,6 +191,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     int nErrors = 0; // Keeps track of number of segments in error state
     int nOffline = 0; // Keeps track of number segments with no online replicas
     int nSegments = 0; // Counts number of segments
+    long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table
     for (String partitionName : segmentsExcludeReplaced) {
       int nReplicas = 0;
       int nIdeal = 0;
@@ -198,6 +217,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
         // Push is not finished yet, skip the segment
         continue;
       }
+      if (segmentZKMetadata != null) {
+        long sizeInBytes = segmentZKMetadata.getSizeInBytes();
+        if (sizeInBytes > 0) {
+          tableCompressedSize += sizeInBytes;
+        }
+      }
       nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState
           .getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
       if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
@@ -240,6 +265,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
         (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
+        tableCompressedSize);
+
     if (nOffline > 0) {
       LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
     }
@@ -287,6 +315,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     setStatusToDefault();
   }
 
+  @VisibleForTesting
+  void setTableSizeReader(TableSizeReader tableSizeReader) {
+    _tableSizeReader = tableSizeReader;
+  }
+
   public static final class Context {
     private boolean _logDisabledTables;
     private int _realTimeTableCount;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cfc83d4..ecbd3b7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -116,6 +116,7 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.segment.local.utils.ReplicationUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.ConfigUtils;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -1910,7 +1911,7 @@ public class PinotHelixResourceManager {
     // NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment
     // might need them to determine the partition of the segment, and server will need them to download the segment
     SegmentZKMetadata segmentZkmetadata =
-        constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter);
+        constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter, -1);
     ZNRecord znRecord = segmentZkmetadata.toZNRecord();
 
     String segmentName = segmentMetadata.getName();
@@ -1930,16 +1931,18 @@ public class PinotHelixResourceManager {
    * @param segmentMetadata Segment metadata
    * @param downloadUrl Download URL
    * @param crypter Crypter
+   * @param segmentSizeInBytes Size of segment in bytes.
    * @return SegmentZkMetadata of the input segment
    */
   public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
-      String downloadUrl, @Nullable String crypter) {
+      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
     // Construct segment zk metadata with common fields for offline and realtime.
     String segmentName = segmentMetadata.getName();
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
     ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
     segmentZKMetadata.setDownloadUrl(downloadUrl);
     segmentZKMetadata.setCrypterName(crypter);
+    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
 
     if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
       Preconditions.checkState(isUpsertTable(tableNameWithType),
@@ -2051,7 +2054,8 @@ public class PinotHelixResourceManager {
   }
 
   public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
-      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter) {
+      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter,
+      long segmentSizeInBytes) {
     String segmentName = segmentMetadata.getName();
 
     // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on segment
@@ -2062,6 +2066,7 @@ public class PinotHelixResourceManager {
     segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
     segmentZKMetadata.setDownloadUrl(downloadUrl);
     segmentZKMetadata.setCrypterName(crypter);
+    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
     if (!ZKMetadataProvider
         .setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata, expectedVersion)) {
       throw new RuntimeException(
@@ -3363,6 +3368,25 @@ public class PinotHelixResourceManager {
     return hosts;
   }
 
+  /**
+   * Returns the number of replicas for a given table config
+   */
+  public int getNumReplicas(TableConfig tableConfig) {
+    if (tableConfig.isDimTable()) {
+      // If the table is a dimension table then fetch the tenant config and get the number of server belonging
+      // to the tenant
+      TenantConfig tenantConfig = tableConfig.getTenantConfig();
+      Set<String> serverInstances = getAllInstancesForServerTenant(tenantConfig.getServer());
+      return serverInstances.size();
+    }
+
+    if (ReplicationUtils.useReplicasPerPartition(tableConfig)) {
+      return Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition());
+    }
+
+    return tableConfig.getValidationConfig().getReplicationNumber();
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
index 6c2556c..4cbc703 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
@@ -32,12 +32,13 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
 import org.apache.pinot.controller.api.resources.ServerTableSizeReader;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,36 +79,40 @@ public class TableSizeReader {
     Preconditions.checkNotNull(tableName, "Table name should not be null");
     Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be greater than 0");
 
-    boolean hasRealtimeTable = false;
-    boolean hasOfflineTable = false;
-    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    TableConfig offlineTableConfig =
+        ZKMetadataProvider.getOfflineTableConfig(_helixResourceManager.getPropertyStore(), tableName);
+    TableConfig realtimeTableConfig =
+        ZKMetadataProvider.getRealtimeTableConfig(_helixResourceManager.getPropertyStore(), tableName);
 
-    if (tableType != null) {
-      hasRealtimeTable = tableType == TableType.REALTIME;
-      hasOfflineTable = tableType == TableType.OFFLINE;
-    } else {
-      hasRealtimeTable = _helixResourceManager.hasRealtimeTable(tableName);
-      hasOfflineTable = _helixResourceManager.hasOfflineTable(tableName);
-    }
-
-    if (!hasOfflineTable && !hasRealtimeTable) {
+    if (offlineTableConfig == null && realtimeTableConfig == null) {
       return null;
     }
-
     TableSizeDetails tableSizeDetails = new TableSizeDetails(tableName);
-
-    if (hasRealtimeTable) {
+    if (realtimeTableConfig != null) {
       String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
       tableSizeDetails._realtimeSegments = getTableSubtypeSize(realtimeTableName, timeoutMsec);
       tableSizeDetails._reportedSizeInBytes += tableSizeDetails._realtimeSegments._reportedSizeInBytes;
       tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._realtimeSegments._estimatedSizeInBytes;
+
+      _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER,
+          tableSizeDetails._realtimeSegments._estimatedSizeInBytes);
+      _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER,
+          tableSizeDetails._realtimeSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas(
+              realtimeTableConfig));
     }
-    if (hasOfflineTable) {
+    if (offlineTableConfig != null) {
       String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
       tableSizeDetails._offlineSegments = getTableSubtypeSize(offlineTableName, timeoutMsec);
       tableSizeDetails._reportedSizeInBytes += tableSizeDetails._offlineSegments._reportedSizeInBytes;
       tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._offlineSegments._estimatedSizeInBytes;
+
+      _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER,
+          tableSizeDetails._offlineSegments._estimatedSizeInBytes);
+      _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER,
+          tableSizeDetails._offlineSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas(
+              offlineTableConfig));
     }
+
     return tableSizeDetails;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index ee732b0..4920cf5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.validation;
 
 import com.google.common.base.Preconditions;
-import java.util.Set;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -27,7 +26,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,17 +84,7 @@ public class StorageQuotaChecker {
     // 3. update predicted segment sizes
     // 4. is the updated size within quota
     QuotaConfig quotaConfig = _tableConfig.getQuotaConfig();
-    int numReplicas;
-
-    if (_tableConfig.isDimTable()) {
-      // If the table is a dimension table then fetch the tenant config and get the number of server belonging
-      // to the tenant
-      TenantConfig tenantConfig = _tableConfig.getTenantConfig();
-      Set<String> serverInstances = _pinotHelixResourceManager.getAllInstancesForServerTenant(tenantConfig.getServer());
-      numReplicas = serverInstances.size();
-    } else {
-      numReplicas = _tableConfig.getValidationConfig().getReplicationNumber();
-    }
+    int numReplicas = _pinotHelixResourceManager.getNumReplicas(_tableConfig);
 
     final String tableNameWithType = _tableConfig.getTableName();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
index f83fbe6..04dbb51 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
@@ -33,16 +33,22 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
 import org.apache.pinot.common.restlet.resources.TableSizeInfo;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.mockito.ArgumentMatchers;
 import org.mockito.invocation.InvocationOnMock;
@@ -64,6 +70,7 @@ public class TableSizeReaderTest {
   private static final String URI_PATH = "/table/";
   private static final int TIMEOUT_MSEC = 10000;
   private static final int EXTENDED_TIMEOUT_FACTOR = 100;
+  private static final int NUM_REPLICAS = 2;
 
   private final Executor _executor = Executors.newFixedThreadPool(1);
   private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
@@ -76,23 +83,25 @@ public class TableSizeReaderTest {
   public void setUp()
       throws IOException {
     _helix = mock(PinotHelixResourceManager.class);
-    when(_helix.hasOfflineTable(anyString())).thenAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        String table = (String) invocationOnMock.getArguments()[0];
-        return table.indexOf("offline") >= 0;
-      }
-    });
 
-    when(_helix.hasRealtimeTable(anyString())).thenAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        String table = (String) invocationOnMock.getArguments()[0];
-        return table.indexOf("realtime") >= 0;
-      }
-    });
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(NUM_REPLICAS).build();
+    ZkHelixPropertyStore mockPropertyStore = mock(ZkHelixPropertyStore.class);
+
+    when(mockPropertyStore.get(ArgumentMatchers.anyString(), ArgumentMatchers.eq(null),
+        ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer((Answer) invocationOnMock -> {
+          String path = (String) invocationOnMock.getArguments()[0];
+          if (path.contains("realtime_REALTIME")) {
+            return TableConfigUtils.toZNRecord(tableConfig);
+          }
+          if (path.contains("offline_OFFLINE")) {
+            return TableConfigUtils.toZNRecord(tableConfig);
+          }
+          return null;
+        });
+
+    when(_helix.getPropertyStore()).thenReturn(mockPropertyStore);
+    when(_helix.getNumReplicas(ArgumentMatchers.eq(tableConfig))).thenReturn(NUM_REPLICAS);
 
     int counter = 0;
     // server0
@@ -313,6 +322,10 @@ public class TableSizeReaderTest {
     String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
     Assert.assertEquals(_controllerMetrics
         .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 0);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes);
   }
 
   @Test
@@ -330,6 +343,10 @@ public class TableSizeReaderTest {
     String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
     Assert.assertEquals(_controllerMetrics
         .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 100);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes);
   }
 
   @Test
@@ -345,19 +362,32 @@ public class TableSizeReaderTest {
     validateTableSubTypeSize(servers, offlineSizes);
     Assert.assertNull(tableSizeDetails._realtimeSegments);
     String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
-    Assert.assertEquals(_controllerMetrics
-        .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER),
+        offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER),
+        offlineSizes._estimatedSizeInBytes);
   }
 
   @Test
   public void getTableSizeDetailsRealtimeOnly()
       throws InvalidConfigException {
     final String[] servers = {"server3", "server4"};
-    TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, "realtime");
+    String table = "realtime";
+    TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, table);
     Assert.assertNull(tableSizeDetails._offlineSegments);
     TableSizeReader.TableSubTypeSizeDetails realtimeSegments = tableSizeDetails._realtimeSegments;
     Assert.assertEquals(realtimeSegments._segments.size(), 2);
     Assert.assertTrue(realtimeSegments._reportedSizeInBytes == realtimeSegments._estimatedSizeInBytes);
     validateTableSubTypeSize(servers, realtimeSegments);
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(table);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER),
+        realtimeSegments._estimatedSizeInBytes / NUM_REPLICAS);
+    Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+        ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), realtimeSegments._estimatedSizeInBytes);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index b701258..c9a119b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -78,7 +78,7 @@ public class ZKOperatorTest {
       zkOperator
           .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI,
               currentSegmentLocation, true, httpHeaders, "downloadUrl",
-              true, "crypter", true);
+              true, "crypter", true, 10);
       fail();
     } catch (Exception e) {
       // Expected
@@ -91,10 +91,9 @@ public class ZKOperatorTest {
       return segmentZKMetadata == null;
     }, 30_000L, "Failed to delete segmentZkMetadata.");
 
-
     zkOperator
         .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, "downloadUrl",
-            false, "crypter", true);
+            false, "crypter", true, 10);
 
     SegmentZKMetadata segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
@@ -107,11 +106,12 @@ public class ZKOperatorTest {
     assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
     assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
     assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
 
     // Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
     try {
       zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders,
-          "otherDownloadUrl", false, "otherCrypter", false);
+          "otherDownloadUrl", false, "otherCrypter", false, 10);
       fail();
     } catch (Exception e) {
       // Expected
@@ -121,7 +121,7 @@ public class ZKOperatorTest {
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
       zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-          "otherDownloadUrl", false, null, true);
+          "otherDownloadUrl", false, null, true, 10);
       fail();
     } catch (Exception e) {
       // Expected
@@ -132,7 +132,7 @@ public class ZKOperatorTest {
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter", true);
+        "otherDownloadUrl", false, "otherCrypter", true, 10);
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
@@ -146,6 +146,7 @@ public class ZKOperatorTest {
     assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
     assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
     assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
 
     // Refresh the segment with a different segment (different CRC)
     when(segmentMetadata.getCrc()).thenReturn("23456");
@@ -155,7 +156,7 @@ public class ZKOperatorTest {
     // not found!" exception from being thrown sporadically.
     Thread.sleep(1000L);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter", true);
+        "otherDownloadUrl", false, "otherCrypter", true, 10);
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 23456L);
@@ -166,6 +167,7 @@ public class ZKOperatorTest {
     assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime);
     assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl");
     assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter");
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
   }
 
   @AfterClass
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 2ff7cec..dbcc051 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
@@ -40,6 +42,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -47,6 +50,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -61,6 +65,8 @@ public class SegmentStatusCheckerTest {
   private PinotMetricsRegistry _metricsRegistry;
   private ControllerMetrics _controllerMetrics;
   private ControllerConf _config;
+  private TableSizeReader _tableSizeReader;
+  private ExecutorService _executorService = Executors.newFixedThreadPool(1);
 
   @Test
   public void offlineBasicTest()
@@ -126,10 +132,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert
@@ -148,6 +160,8 @@ public class SegmentStatusCheckerTest {
             66);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
@@ -201,10 +215,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(
@@ -221,7 +241,8 @@ public class SegmentStatusCheckerTest {
   }
 
   @Test
-  public void missingEVPartitionTest() {
+  public void missingEVPartitionTest()
+      throws Exception {
     String offlineTableName = "myTable_OFFLINE";
     List<String> allTableNames = new ArrayList<String>();
     allTableNames.add(offlineTableName);
@@ -254,6 +275,7 @@ public class SegmentStatusCheckerTest {
     znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0");
     znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
     znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
 
     ZkHelixPropertyStore<ZNRecord> propertyStore;
     {
@@ -281,10 +303,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(
@@ -294,6 +322,8 @@ public class SegmentStatusCheckerTest {
             0);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
   }
 
   @Test
@@ -330,14 +360,22 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), 0);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
@@ -362,10 +400,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -374,10 +418,13 @@ public class SegmentStatusCheckerTest {
         Long.MIN_VALUE);
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS),
         Long.MIN_VALUE);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
-  public void missingEVPartitionPushTest() {
+  public void missingEVPartitionPushTest()
+      throws Exception {
     String offlineTableName = "myTable_OFFLINE";
     List<String> allTableNames = new ArrayList<String>();
     allTableNames.add(offlineTableName);
@@ -408,6 +455,7 @@ public class SegmentStatusCheckerTest {
     znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0");
     znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
     znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
 
     ZNRecord znrecord2 = new ZNRecord("myTable_2");
     znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
@@ -420,6 +468,7 @@ public class SegmentStatusCheckerTest {
     znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_2");
     znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
     znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
@@ -442,10 +491,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(
@@ -459,6 +514,8 @@ public class SegmentStatusCheckerTest {
             100);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    Assert.assertEquals(
+        _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
@@ -491,10 +548,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -538,7 +601,8 @@ public class SegmentStatusCheckerTest {
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
     // verify state before test
     Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -577,7 +641,8 @@ public class SegmentStatusCheckerTest {
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
     // verify state before test
     Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -624,10 +689,16 @@ public class SegmentStatusCheckerTest {
       _leadControllerManager = mock(LeadControllerManager.class);
       when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
+    {
+      _tableSizeReader = mock(TableSizeReader.class);
+      when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+    }
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+            _executorService);
+    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
     Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index 2d351d7..2d42bf7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -94,8 +95,10 @@ public class StorageQuotaCheckerTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
     _tableSizeReader = mock(TableSizeReader.class);
     ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    _storageQuotaChecker = new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true,
-        mock(PinotHelixResourceManager.class));
+    PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS);
+    _storageQuotaChecker =
+        new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true, pinotHelixResourceManager);
     tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null));
 
     // No response from server, should pass without updating metrics
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 335d8f4..fcac9a0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -89,7 +89,8 @@ public class ValidationManagerTest {
     }, 30_000L, "Failed to find the segment in the ExternalView");
     Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     ControllerTestUtils.getHelixResourceManager()
-        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
+        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null,
+            -1);
 
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME, TEST_SEGMENT_NAME);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7e5c956..7cda305 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -550,6 +550,7 @@ public class CommonConstants {
     public static final String CRYPTER_NAME = "segment.crypter";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
     public static final String CUSTOM_MAP = "custom.map";
+    public static final String SIZE_IN_BYTES = "segment.size.in.bytes";
 
     /**
      * This field is used for parallel push protection to lock the segment globally.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org