You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/25 20:59:31 UTC
[pinot] branch master updated: New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e02bdda New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)
e02bdda is described below
commit e02bdda1501abb82e2efbc27674a74a6fb5519b5
Author: Tim Santos <ti...@cortexdata.io>
AuthorDate: Fri Mar 25 21:59:04 2022 +0100
New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358)
---
.../etc/jmx_prometheus_javaagent/configs/pinot.yml | 15 ++++
.../broker/broker/HelixBrokerStarterTest.java | 2 +-
.../common/metadata/segment/SegmentZKMetadata.java | 8 ++
.../pinot/common/metrics/ControllerGauge.java | 10 +++
.../pinot/controller/BaseControllerStarter.java | 3 +-
.../apache/pinot/controller/ControllerConf.java | 2 +-
.../PinotSegmentUploadDownloadRestletResource.java | 21 +++--
.../pinot/controller/api/upload/ZKOperator.java | 20 +++--
.../controller/helix/SegmentStatusChecker.java | 35 +++++++-
.../helix/core/PinotHelixResourceManager.java | 30 ++++++-
.../pinot/controller/util/TableSizeReader.java | 39 +++++----
.../controller/validation/StorageQuotaChecker.java | 14 +---
.../pinot/controller/api/TableSizeReaderTest.java | 68 +++++++++++-----
.../controller/api/upload/ZKOperatorTest.java | 16 ++--
.../controller/helix/SegmentStatusCheckerTest.java | 95 +++++++++++++++++++---
.../validation/StorageQuotaCheckerTest.java | 7 +-
.../validation/ValidationManagerTest.java | 3 +-
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
18 files changed, 297 insertions(+), 92 deletions(-)
diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index dcaff91..e784180 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -106,6 +106,21 @@ rules:
cache: true
labels:
table: "$1"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableTotalSizeOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
+ name: "pinot_controller_tableTotalSizeOnServer_$3"
+ labels:
+ table: "$1"
+ tableType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableSizePerReplicaOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
+ name: "pinot_controller_tableSizePerReplicaOnServer_$3"
+ labels:
+ table: "$1"
+ tableType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableCompressedSize.(\\w+)_(\\w+)\"><>(\\w+)"
+ name: "pinot_controller_tableCompressedSize_$3"
+ labels:
+ table: "$1"
+ tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableQuota.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableQuota_$3"
cache: true
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 3f7d59b..7f29c98 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -228,7 +228,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
_helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh);
_helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
- segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
+ segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1);
TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
.equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 13264a1..0b73af4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -125,6 +125,14 @@ public class SegmentZKMetadata implements ZKMetadata {
setNonNegativeValue(Segment.TOTAL_DOCS, totalDocs);
}
+ public void setSizeInBytes(long sizeInBytes) {
+ setNonNegativeValue(Segment.SIZE_IN_BYTES, sizeInBytes);
+ }
+
+ public long getSizeInBytes() {
+ return _znRecord.getLongField(Segment.SIZE_IN_BYTES, -1);
+ }
+
public long getCrc() {
return _znRecord.getLongField(Segment.CRC, -1);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index ae33acc..7e908ff 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,8 +67,18 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
CONTROLLER_LEADER_PARTITION_COUNT("ControllerLeaderPartitionCount", true),
// Estimated size of offline table
+ @Deprecated // Instead use TABLE_TOTAL_SIZE_ON_SERVER
OFFLINE_TABLE_ESTIMATED_SIZE("OfflineTableEstimatedSize", false),
+ // Total size of table across replicas on servers
+ TABLE_TOTAL_SIZE_ON_SERVER("TableTotalSizeOnServer", false),
+
+ // Size of table per replica on servers
+ TABLE_SIZE_PER_REPLICA_ON_SERVER("TableSizePerReplicaOnServer", false),
+
+ // Total size of compressed segments per table
+ TABLE_COMPRESSED_SIZE("TableCompressedSize", false),
+
// Table quota based on setting in table config
TABLE_QUOTA("TableQuotaBasedOnTableConfig", false),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 6a3e9a5..94e68c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -646,7 +646,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
periodicTasks.add(_brokerResourceValidationManager);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
periodicTasks.add(_segmentStatusChecker);
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index cbda36a..25514ab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -251,7 +251,7 @@ public class ControllerConf extends PinotConfiguration {
private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M";
private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
- public static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";
+ private static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";
public ControllerConf() {
super(new HashMap<>());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 253a3fb..dfd3b54 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -214,17 +214,28 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader);
FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr);
File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
+ long segmentSizeInBytes;
switch (uploadType) {
case URI:
downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
+ segmentSizeInBytes = dstFile.length();
break;
case SEGMENT:
createSegmentFileFromMultipart(multiPart, dstFile);
+ segmentSizeInBytes = dstFile.length();
break;
case METADATA:
moveSegmentToFinalLocation = false;
Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode");
createSegmentFileFromMultipart(multiPart, dstFile);
+ try {
+ URI segmentURI = new URI(downloadUri);
+ PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+ segmentSizeInBytes = pinotFS.length(segmentURI);
+ } catch (Exception e) {
+ segmentSizeInBytes = -1;
+ LOGGER.warn("Could not fetch segment size for metadata push", e);
+ }
break;
default:
throw new UnsupportedOperationException("Unsupported upload type: " + uploadType);
@@ -302,7 +313,7 @@ public class PinotSegmentUploadDownloadRestletResource {
// Zk operations
completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata,
- segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh);
+ segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh, segmentSizeInBytes);
return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
@@ -397,15 +408,15 @@ public class PinotSegmentUploadDownloadRestletResource {
private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile,
String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh)
+ boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh, long segmentSizeInBytes)
throws Exception {
String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, URIUtils.encode(segmentName));
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
- zkOperator
- .completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile,
- enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, allowRefresh);
+ zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI,
+ uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter,
+ allowRefresh, segmentSizeInBytes);
}
private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 693d42f..28f1753 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -61,7 +61,7 @@ public class ZKOperator {
public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter,
- boolean allowRefresh)
+ boolean allowRefresh, long segmentSizeInBytes)
throws Exception {
String segmentName = segmentMetadata.getName();
ZNRecord segmentMetadataZNRecord =
@@ -76,7 +76,8 @@ public class ZKOperator {
}
LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers,
- crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection);
+ crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection,
+ segmentSizeInBytes);
return;
}
@@ -101,13 +102,13 @@ public class ZKOperator {
LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,
- segmentMetadataZNRecord, moveSegmentToFinalLocation);
+ segmentMetadataZNRecord, moveSegmentToFinalLocation, segmentSizeInBytes);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord,
- boolean moveSegmentToFinalLocation)
+ boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
throws Exception {
SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord);
@@ -202,7 +203,7 @@ public class ZKOperator {
_pinotHelixResourceManager
.refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion,
- zkDownloadURI, crypter);
+ zkDownloadURI, crypter, segmentSizeInBytes);
}
} catch (Exception e) {
if (!_pinotHelixResourceManager
@@ -234,10 +235,12 @@ public class ZKOperator {
private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
- String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
+ String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+ long segmentSizeInBytes)
throws Exception {
- SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
- .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
+ SegmentZKMetadata newSegmentZKMetadata =
+ _pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI,
+ crypter, segmentSizeInBytes);
// Lock if enableParallelPushProtection is true.
if (enableParallelPushProtection) {
@@ -253,7 +256,6 @@ public class ZKOperator {
newSegmentZKMetadata
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
}
-
if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) {
throw new RuntimeException(
"Failed to create ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 3f2e378..0e69829 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -18,14 +18,18 @@
*/
package org.apache.pinot.controller.helix;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
@@ -36,6 +40,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -57,21 +62,28 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer();
+ private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;
+
private final int _waitForPushTimeSeconds;
private long _lastDisabledTableLogTimestamp = 0;
+ private TableSizeReader _tableSizeReader;
+
/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
* @param config The controller configuration object
*/
public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
- LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+ LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics,
+ ExecutorService executorService) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
+ _tableSizeReader = new TableSizeReader(executorService, new SimpleHttpConnectionManager(true), _controllerMetrics,
+ _pinotHelixResourceManager);
}
@Override
@@ -96,6 +108,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
protected void processTable(String tableNameWithType, Context context) {
try {
updateSegmentMetrics(tableNameWithType, context);
+ updateTableSizeMetrics(tableNameWithType);
} catch (Exception e) {
LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
// Remove the metric for this table
@@ -110,6 +123,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTableCount);
}
+ private void updateTableSizeMetrics(String tableNameWithType)
+ throws InvalidConfigException {
+ _tableSizeReader.getTableSizeDetails(tableNameWithType, TABLE_CHECKER_TIMEOUT_MS);
+ }
+
/**
* Runs a segment status pass over the given table.
* TODO: revisit the logic and reduce the ZK access
@@ -173,6 +191,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
int nErrors = 0; // Keeps track of number of segments in error state
int nOffline = 0; // Keeps track of number segments with no online replicas
int nSegments = 0; // Counts number of segments
+ long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table
for (String partitionName : segmentsExcludeReplaced) {
int nReplicas = 0;
int nIdeal = 0;
@@ -198,6 +217,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
// Push is not finished yet, skip the segment
continue;
}
+ if (segmentZKMetadata != null) {
+ long sizeInBytes = segmentZKMetadata.getSizeInBytes();
+ if (sizeInBytes > 0) {
+ tableCompressedSize += sizeInBytes;
+ }
+ }
nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState
.getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
@@ -240,6 +265,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
(nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
+ tableCompressedSize);
+
if (nOffline > 0) {
LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
}
@@ -287,6 +315,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
setStatusToDefault();
}
+ @VisibleForTesting
+ void setTableSizeReader(TableSizeReader tableSizeReader) {
+ _tableSizeReader = tableSizeReader;
+ }
+
public static final class Context {
private boolean _logDisabledTables;
private int _realTimeTableCount;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cfc83d4..ecbd3b7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -116,6 +116,7 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.segment.local.utils.ReplicationUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
@@ -1910,7 +1911,7 @@ public class PinotHelixResourceManager {
// NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment
// might need them to determine the partition of the segment, and server will need them to download the segment
SegmentZKMetadata segmentZkmetadata =
- constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter);
+ constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter, -1);
ZNRecord znRecord = segmentZkmetadata.toZNRecord();
String segmentName = segmentMetadata.getName();
@@ -1930,16 +1931,18 @@ public class PinotHelixResourceManager {
* @param segmentMetadata Segment metadata
* @param downloadUrl Download URL
* @param crypter Crypter
+ * @param segmentSizeInBytes Size of segment in bytes.
* @return SegmentZkMetadata of the input segment
*/
public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
- String downloadUrl, @Nullable String crypter) {
+ String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
// Construct segment zk metadata with common fields for offline and realtime.
String segmentName = segmentMetadata.getName();
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
segmentZKMetadata.setDownloadUrl(downloadUrl);
segmentZKMetadata.setCrypterName(crypter);
+ segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
Preconditions.checkState(isUpsertTable(tableNameWithType),
@@ -2051,7 +2054,8 @@ public class PinotHelixResourceManager {
}
public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
- SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter) {
+ SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter,
+ long segmentSizeInBytes) {
String segmentName = segmentMetadata.getName();
// NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on segment
@@ -2062,6 +2066,7 @@ public class PinotHelixResourceManager {
segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
segmentZKMetadata.setDownloadUrl(downloadUrl);
segmentZKMetadata.setCrypterName(crypter);
+ segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
if (!ZKMetadataProvider
.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
@@ -3363,6 +3368,25 @@ public class PinotHelixResourceManager {
return hosts;
}
+ /**
+ * Returns the number of replicas for a given table config
+ */
+ public int getNumReplicas(TableConfig tableConfig) {
+ if (tableConfig.isDimTable()) {
+ // If the table is a dimension table then fetch the tenant config and get the number of server belonging
+ // to the tenant
+ TenantConfig tenantConfig = tableConfig.getTenantConfig();
+ Set<String> serverInstances = getAllInstancesForServerTenant(tenantConfig.getServer());
+ return serverInstances.size();
+ }
+
+ if (ReplicationUtils.useReplicasPerPartition(tableConfig)) {
+ return Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition());
+ }
+
+ return tableConfig.getValidationConfig().getReplicationNumber();
+ }
+
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
index 6c2556c..4cbc703 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
@@ -32,12 +32,13 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
import org.apache.pinot.controller.api.resources.ServerTableSizeReader;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,36 +79,40 @@ public class TableSizeReader {
Preconditions.checkNotNull(tableName, "Table name should not be null");
Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be greater than 0");
- boolean hasRealtimeTable = false;
- boolean hasOfflineTable = false;
- TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ TableConfig offlineTableConfig =
+ ZKMetadataProvider.getOfflineTableConfig(_helixResourceManager.getPropertyStore(), tableName);
+ TableConfig realtimeTableConfig =
+ ZKMetadataProvider.getRealtimeTableConfig(_helixResourceManager.getPropertyStore(), tableName);
- if (tableType != null) {
- hasRealtimeTable = tableType == TableType.REALTIME;
- hasOfflineTable = tableType == TableType.OFFLINE;
- } else {
- hasRealtimeTable = _helixResourceManager.hasRealtimeTable(tableName);
- hasOfflineTable = _helixResourceManager.hasOfflineTable(tableName);
- }
-
- if (!hasOfflineTable && !hasRealtimeTable) {
+ if (offlineTableConfig == null && realtimeTableConfig == null) {
return null;
}
-
TableSizeDetails tableSizeDetails = new TableSizeDetails(tableName);
-
- if (hasRealtimeTable) {
+ if (realtimeTableConfig != null) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
tableSizeDetails._realtimeSegments = getTableSubtypeSize(realtimeTableName, timeoutMsec);
tableSizeDetails._reportedSizeInBytes += tableSizeDetails._realtimeSegments._reportedSizeInBytes;
tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._realtimeSegments._estimatedSizeInBytes;
+
+ _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER,
+ tableSizeDetails._realtimeSegments._estimatedSizeInBytes);
+ _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER,
+ tableSizeDetails._realtimeSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas(
+ realtimeTableConfig));
}
- if (hasOfflineTable) {
+ if (offlineTableConfig != null) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
tableSizeDetails._offlineSegments = getTableSubtypeSize(offlineTableName, timeoutMsec);
tableSizeDetails._reportedSizeInBytes += tableSizeDetails._offlineSegments._reportedSizeInBytes;
tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._offlineSegments._estimatedSizeInBytes;
+
+ _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER,
+ tableSizeDetails._offlineSegments._estimatedSizeInBytes);
+ _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER,
+ tableSizeDetails._offlineSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas(
+ offlineTableConfig));
}
+
return tableSizeDetails;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index ee732b0..4920cf5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -19,7 +19,6 @@
package org.apache.pinot.controller.validation;
import com.google.common.base.Preconditions;
-import java.util.Set;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -27,7 +26,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,17 +84,7 @@ public class StorageQuotaChecker {
// 3. update predicted segment sizes
// 4. is the updated size within quota
QuotaConfig quotaConfig = _tableConfig.getQuotaConfig();
- int numReplicas;
-
- if (_tableConfig.isDimTable()) {
- // If the table is a dimension table then fetch the tenant config and get the number of server belonging
- // to the tenant
- TenantConfig tenantConfig = _tableConfig.getTenantConfig();
- Set<String> serverInstances = _pinotHelixResourceManager.getAllInstancesForServerTenant(tenantConfig.getServer());
- numReplicas = serverInstances.size();
- } else {
- numReplicas = _tableConfig.getValidationConfig().getReplicationNumber();
- }
+ int numReplicas = _pinotHelixResourceManager.getNumReplicas(_tableConfig);
final String tableNameWithType = _tableConfig.getTableName();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
index f83fbe6..04dbb51 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
@@ -33,16 +33,22 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
import org.apache.pinot.common.restlet.resources.TableSizeInfo;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
@@ -64,6 +70,7 @@ public class TableSizeReaderTest {
private static final String URI_PATH = "/table/";
private static final int TIMEOUT_MSEC = 10000;
private static final int EXTENDED_TIMEOUT_FACTOR = 100;
+ private static final int NUM_REPLICAS = 2;
private final Executor _executor = Executors.newFixedThreadPool(1);
private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
@@ -76,23 +83,25 @@ public class TableSizeReaderTest {
public void setUp()
throws IOException {
_helix = mock(PinotHelixResourceManager.class);
- when(_helix.hasOfflineTable(anyString())).thenAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock)
- throws Throwable {
- String table = (String) invocationOnMock.getArguments()[0];
- return table.indexOf("offline") >= 0;
- }
- });
- when(_helix.hasRealtimeTable(anyString())).thenAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock)
- throws Throwable {
- String table = (String) invocationOnMock.getArguments()[0];
- return table.indexOf("realtime") >= 0;
- }
- });
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(NUM_REPLICAS).build();
+ ZkHelixPropertyStore mockPropertyStore = mock(ZkHelixPropertyStore.class);
+
+ when(mockPropertyStore.get(ArgumentMatchers.anyString(), ArgumentMatchers.eq(null),
+ ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer((Answer) invocationOnMock -> {
+ String path = (String) invocationOnMock.getArguments()[0];
+ if (path.contains("realtime_REALTIME")) {
+ return TableConfigUtils.toZNRecord(tableConfig);
+ }
+ if (path.contains("offline_OFFLINE")) {
+ return TableConfigUtils.toZNRecord(tableConfig);
+ }
+ return null;
+ });
+
+ when(_helix.getPropertyStore()).thenReturn(mockPropertyStore);
+ when(_helix.getNumReplicas(ArgumentMatchers.eq(tableConfig))).thenReturn(NUM_REPLICAS);
int counter = 0;
// server0
@@ -313,6 +322,10 @@ public class TableSizeReaderTest {
String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
Assert.assertEquals(_controllerMetrics
.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 0);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes);
}
@Test
@@ -330,6 +343,10 @@ public class TableSizeReaderTest {
String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
Assert.assertEquals(_controllerMetrics
.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 100);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes);
}
@Test
@@ -345,19 +362,32 @@ public class TableSizeReaderTest {
validateTableSubTypeSize(servers, offlineSizes);
Assert.assertNull(tableSizeDetails._realtimeSegments);
String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table);
- Assert.assertEquals(_controllerMetrics
- .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER),
+ offlineSizes._estimatedSizeInBytes / NUM_REPLICAS);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER),
+ offlineSizes._estimatedSizeInBytes);
}
@Test
public void getTableSizeDetailsRealtimeOnly()
throws InvalidConfigException {
final String[] servers = {"server3", "server4"};
- TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, "realtime");
+ String table = "realtime";
+ TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, table);
Assert.assertNull(tableSizeDetails._offlineSegments);
TableSizeReader.TableSubTypeSizeDetails realtimeSegments = tableSizeDetails._realtimeSegments;
Assert.assertEquals(realtimeSegments._segments.size(), 2);
Assert.assertTrue(realtimeSegments._reportedSizeInBytes == realtimeSegments._estimatedSizeInBytes);
validateTableSubTypeSize(servers, realtimeSegments);
+ String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(table);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER),
+ realtimeSegments._estimatedSizeInBytes / NUM_REPLICAS);
+ Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), realtimeSegments._estimatedSizeInBytes);
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index b701258..c9a119b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -78,7 +78,7 @@ public class ZKOperatorTest {
zkOperator
.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, true, httpHeaders, "downloadUrl",
- true, "crypter", true);
+ true, "crypter", true, 10);
fail();
} catch (Exception e) {
// Expected
@@ -91,10 +91,9 @@ public class ZKOperatorTest {
return segmentZKMetadata == null;
}, 30_000L, "Failed to delete segmentZkMetadata.");
-
zkOperator
.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, "downloadUrl",
- false, "crypter", true);
+ false, "crypter", true, 10);
SegmentZKMetadata segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
@@ -107,11 +106,12 @@ public class ZKOperatorTest {
assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
// Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
try {
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", false);
+ "otherDownloadUrl", false, "otherCrypter", false, 10);
fail();
} catch (Exception e) {
// Expected
@@ -121,7 +121,7 @@ public class ZKOperatorTest {
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
try {
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
- "otherDownloadUrl", false, null, true);
+ "otherDownloadUrl", false, null, true, 10);
fail();
} catch (Exception e) {
// Expected
@@ -132,7 +132,7 @@ public class ZKOperatorTest {
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", true);
+ "otherDownloadUrl", false, "otherCrypter", true, 10);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
@@ -146,6 +146,7 @@ public class ZKOperatorTest {
assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
// Refresh the segment with a different segment (different CRC)
when(segmentMetadata.getCrc()).thenReturn("23456");
@@ -155,7 +156,7 @@ public class ZKOperatorTest {
// not found!" exception from being thrown sporadically.
Thread.sleep(1000L);
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", true);
+ "otherDownloadUrl", false, "otherCrypter", true, 10);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertEquals(segmentZKMetadata.getCrc(), 23456L);
@@ -166,6 +167,7 @@ public class ZKOperatorTest {
assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime);
assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl");
assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter");
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
}
@AfterClass
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 2ff7cec..dbcc051 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
@@ -40,6 +42,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -47,6 +50,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -61,6 +65,8 @@ public class SegmentStatusCheckerTest {
private PinotMetricsRegistry _metricsRegistry;
private ControllerMetrics _controllerMetrics;
private ControllerConf _config;
+ private TableSizeReader _tableSizeReader;
+ private ExecutorService _executorService = Executors.newFixedThreadPool(1);
@Test
public void offlineBasicTest()
@@ -126,10 +132,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert
@@ -148,6 +160,8 @@ public class SegmentStatusCheckerTest {
66);
Assert.assertEquals(
_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
}
@Test
@@ -201,10 +215,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(
@@ -221,7 +241,8 @@ public class SegmentStatusCheckerTest {
}
@Test
- public void missingEVPartitionTest() {
+ public void missingEVPartitionTest()
+ throws Exception {
String offlineTableName = "myTable_OFFLINE";
List<String> allTableNames = new ArrayList<String>();
allTableNames.add(offlineTableName);
@@ -254,6 +275,7 @@ public class SegmentStatusCheckerTest {
znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0");
znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+ znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
ZkHelixPropertyStore<ZNRecord> propertyStore;
{
@@ -281,10 +303,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(
@@ -294,6 +322,8 @@ public class SegmentStatusCheckerTest {
0);
Assert.assertEquals(
_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
}
@Test
@@ -330,14 +360,22 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), 0);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
}
@Test
@@ -362,10 +400,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -374,10 +418,13 @@ public class SegmentStatusCheckerTest {
Long.MIN_VALUE);
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS),
Long.MIN_VALUE);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
}
@Test
- public void missingEVPartitionPushTest() {
+ public void missingEVPartitionPushTest()
+ throws Exception {
String offlineTableName = "myTable_OFFLINE";
List<String> allTableNames = new ArrayList<String>();
allTableNames.add(offlineTableName);
@@ -408,6 +455,7 @@ public class SegmentStatusCheckerTest {
znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0");
znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+ znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
ZNRecord znrecord2 = new ZNRecord("myTable_2");
znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
@@ -420,6 +468,7 @@ public class SegmentStatusCheckerTest {
znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_2");
znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis());
znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis());
+ znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
{
_helixResourceManager = mock(PinotHelixResourceManager.class);
@@ -442,10 +491,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(
@@ -459,6 +514,8 @@ public class SegmentStatusCheckerTest {
100);
Assert.assertEquals(
_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ Assert.assertEquals(
+ _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
}
@Test
@@ -491,10 +548,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -538,7 +601,8 @@ public class SegmentStatusCheckerTest {
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
// verify state before test
Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
// update metrics
@@ -577,7 +641,8 @@ public class SegmentStatusCheckerTest {
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
// verify state before test
Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
// update metrics
@@ -624,10 +689,16 @@ public class SegmentStatusCheckerTest {
_leadControllerManager = mock(LeadControllerManager.class);
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
+ {
+ _tableSizeReader = mock(TableSizeReader.class);
+ when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null);
+ }
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index 2d351d7..2d42bf7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -94,8 +95,10 @@ public class StorageQuotaCheckerTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
_tableSizeReader = mock(TableSizeReader.class);
ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- _storageQuotaChecker = new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true,
- mock(PinotHelixResourceManager.class));
+ PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS);
+ _storageQuotaChecker =
+ new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true, pinotHelixResourceManager);
tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null));
// No response from server, should pass without updating metrics
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 335d8f4..fcac9a0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -89,7 +89,8 @@ public class ValidationManagerTest {
}, 30_000L, "Failed to find the segment in the ExternalView");
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
ControllerTestUtils.getHelixResourceManager()
- .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
+ .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null,
+ -1);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME, TEST_SEGMENT_NAME);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7e5c956..7cda305 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -550,6 +550,7 @@ public class CommonConstants {
public static final String CRYPTER_NAME = "segment.crypter";
public static final String PARTITION_METADATA = "segment.partition.metadata";
public static final String CUSTOM_MAP = "custom.map";
+ public static final String SIZE_IN_BYTES = "segment.size.in.bytes";
/**
* This field is used for parallel push protection to lock the segment globally.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org