You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2024/02/28 16:00:26 UTC
(pinot) branch master updated: Metric for upsert tables count (#12505)
This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bc07b8dd2c Metric for upsert tables count (#12505)
bc07b8dd2c is described below
commit bc07b8dd2c953b8f88967d56c1c28af1873ab158
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Feb 28 21:00:18 2024 +0500
Metric for upsert tables count (#12505)
* Metric for upsert table count
* move other table metric computation to updateTableConfigMetrics
---
.../pinot/common/metrics/ControllerGauge.java | 1 +
.../controller/helix/SegmentStatusChecker.java | 19 +++++++-----
.../ControllerPeriodicTasksIntegrationTest.java | 36 ++++++++++++++++++++--
.../tests/UpsertTableIntegrationTest.java | 8 +++--
4 files changed, 51 insertions(+), 13 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 82e86c55a9..938242ef78 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -54,6 +54,7 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
REALTIME_TABLE_COUNT("TableCount", true),
OFFLINE_TABLE_COUNT("TableCount", true),
DISABLED_TABLE_COUNT("TableCount", true),
+ UPSERT_TABLE_COUNT("TableCount", true),
PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE("TimeMsSinceLastMinionTaskMetadataUpdate", false),
TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION("TimeMsSinceLastSuccessfulMinionTaskGeneration", false),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index dd598b2b21..1ac7446592 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -117,7 +117,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
protected void processTable(String tableNameWithType, Context context) {
try {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- updateTableConfigMetrics(tableNameWithType, tableConfig);
+ updateTableConfigMetrics(tableNameWithType, tableConfig, context);
updateSegmentMetrics(tableNameWithType, tableConfig, context);
updateTableSizeMetrics(tableNameWithType);
} catch (Exception e) {
@@ -132,6 +132,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
protected void postprocess(Context context) {
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, context._realTimeTableCount);
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, context._offlineTableCount);
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.UPSERT_TABLE_COUNT, context._upsertTableCount);
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTables.size());
//emit a 0 for tables that are not paused/disabled. This makes alert expressions simpler as we don't have to deal
@@ -156,12 +157,20 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
* Updates metrics related to the table config.
* If table config not found, resets the metrics
*/
- private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig) {
+ private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
if (tableConfig == null) {
LOGGER.warn("Found null table config for table: {}. Resetting table config metrics.", tableNameWithType);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, 0);
return;
}
+ if (tableConfig.getTableType() == TableType.OFFLINE) {
+ context._offlineTableCount++;
+ } else {
+ context._realTimeTableCount++;
+ }
+ if (tableConfig.isUpsertEnabled()) {
+ context._upsertTableCount++;
+ }
int replication = tableConfig.getReplication();
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, replication);
}
@@ -177,11 +186,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
*/
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.OFFLINE) {
- context._offlineTableCount++;
- } else {
- context._realTimeTableCount++;
- }
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
@@ -386,6 +390,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
private boolean _logDisabledTables;
private int _realTimeTableCount;
private int _offlineTableCount;
+ private int _upsertTableCount;
private Set<String> _processedTables = new HashSet<>();
private Set<String> _disabledTables = new HashSet<>();
private Set<String> _pausedTables = new HashSet<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 6b558b93e7..03a2b6a000 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -19,8 +19,10 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -75,11 +78,18 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
private String _currentTable = DEFAULT_TABLE_NAME;
+ private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
@Override
protected String getTableName() {
return _currentTable;
}
+ @Override
+ protected String getSchemaFileName() {
+ return _schemaFileName;
+ }
+
@Override
protected int getNumReplicas() {
return NUM_REPLICAS;
@@ -182,6 +192,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
String emptyTable = "emptyTable";
String disabledTable = "disabledTable";
String tableWithOfflineSegment = "tableWithOfflineSegment";
+ String upsertTable = "upsertTable";
Schema schema = createSchema();
_currentTable = emptyTable;
@@ -195,6 +206,11 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
addTableConfig(createOfflineTableConfig());
_helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false);
+ _currentTable = upsertTable;
+ _schemaFileName = UpsertTableIntegrationTest.UPSERT_SCHEMA_FILE_NAME;
+ setupUpsertTable();
+ _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
_currentTable = tableWithOfflineSegment;
schema.setSchemaName(_currentTable);
addSchema(schema);
@@ -211,7 +227,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
_currentTable = DEFAULT_TABLE_NAME;
- int numTables = 5;
+ int numTables = 6;
ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
TestUtils.waitForCondition(aVoid -> {
if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, "SegmentStatusChecker",
@@ -246,8 +262,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
return false;
}
return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.OFFLINE_TABLE_COUNT) == 4
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 1
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1;
+ && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 2
+ && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1
+ && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.UPSERT_TABLE_COUNT) == 1;
}, 60_000, "Timed out waiting for SegmentStatusChecker");
dropOfflineTable(emptyTable);
@@ -258,6 +275,19 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
deleteSchema(tableWithOfflineSegment);
}
+ private void setupUpsertTable()
+ throws IOException {
+ Schema upsertSchema = createSchema();
+ upsertSchema.setSchemaName(getTableName());
+ upsertSchema.getDateTimeFieldSpecs().get(0).setName(UpsertTableIntegrationTest.TIME_COL_NAME);
+ addSchema(upsertSchema);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), getNumKafkaPartitions(), new HashMap<>(),
+ new UpsertConfig(UpsertConfig.Mode.FULL), UpsertTableIntegrationTest.PRIMARY_KEY_COL);
+ tableConfig.getValidationConfig().setTimeColumnName(UpsertTableIntegrationTest.TIME_COL_NAME);
+ addTableConfig(tableConfig);
+ }
+
private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType,
IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState,
long expectedPercentSegmentsAvailable) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 342a8b01f3..238d515b54 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -82,8 +82,10 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
private static final String CSV_DELIMITER = ",";
private static final String TABLE_NAME = "gameScores";
private static final int NUM_SERVERS = 2;
- private static final String PRIMARY_KEY_COL = "playerId";
private static final String DELETE_COL = "deleted";
+ public static final String PRIMARY_KEY_COL = "playerId";
+ public static final String TIME_COL_NAME = "timestampInEpoch";
+ public static final String UPSERT_SCHEMA_FILE_NAME = "upsert_table_test.schema";
protected PinotTaskManager _taskManager;
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
@@ -133,13 +135,13 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
@Override
protected String getSchemaFileName() {
- return "upsert_table_test.schema";
+ return UPSERT_SCHEMA_FILE_NAME;
}
@Nullable
@Override
protected String getTimeColumnName() {
- return "timestampInEpoch";
+ return TIME_COL_NAME;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org