You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2024/02/28 16:00:26 UTC

(pinot) branch master updated: Metric for upsert tables count (#12505)

This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bc07b8dd2c Metric for upsert tables count (#12505)
bc07b8dd2c is described below

commit bc07b8dd2c953b8f88967d56c1c28af1873ab158
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Feb 28 21:00:18 2024 +0500

    Metric for upsert tables count (#12505)
    
    * Metric for upsert table count
    
    * move other table metric computation to updateTableConfigMetrics
---
 .../pinot/common/metrics/ControllerGauge.java      |  1 +
 .../controller/helix/SegmentStatusChecker.java     | 19 +++++++-----
 .../ControllerPeriodicTasksIntegrationTest.java    | 36 ++++++++++++++++++++--
 .../tests/UpsertTableIntegrationTest.java          |  8 +++--
 4 files changed, 51 insertions(+), 13 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 82e86c55a9..938242ef78 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -54,6 +54,7 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
   REALTIME_TABLE_COUNT("TableCount", true),
   OFFLINE_TABLE_COUNT("TableCount", true),
   DISABLED_TABLE_COUNT("TableCount", true),
+  UPSERT_TABLE_COUNT("TableCount", true),
   PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
   TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE("TimeMsSinceLastMinionTaskMetadataUpdate", false),
   TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION("TimeMsSinceLastSuccessfulMinionTaskGeneration", false),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index dd598b2b21..1ac7446592 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -117,7 +117,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
   protected void processTable(String tableNameWithType, Context context) {
     try {
       TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      updateTableConfigMetrics(tableNameWithType, tableConfig);
+      updateTableConfigMetrics(tableNameWithType, tableConfig, context);
       updateSegmentMetrics(tableNameWithType, tableConfig, context);
       updateTableSizeMetrics(tableNameWithType);
     } catch (Exception e) {
@@ -132,6 +132,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
   protected void postprocess(Context context) {
     _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, context._realTimeTableCount);
     _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, context._offlineTableCount);
+    _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.UPSERT_TABLE_COUNT, context._upsertTableCount);
     _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTables.size());
 
     //emit a 0 for tables that are not paused/disabled. This makes alert expressions simpler as we don't have to deal
@@ -156,12 +157,20 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
    * Updates metrics related to the table config.
    * If table config not found, resets the metrics
    */
-  private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig) {
+  private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
     if (tableConfig == null) {
       LOGGER.warn("Found null table config for table: {}. Resetting table config metrics.", tableNameWithType);
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, 0);
       return;
     }
+    if (tableConfig.getTableType() == TableType.OFFLINE) {
+      context._offlineTableCount++;
+    } else {
+      context._realTimeTableCount++;
+    }
+    if (tableConfig.isUpsertEnabled()) {
+      context._upsertTableCount++;
+    }
     int replication = tableConfig.getReplication();
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, replication);
   }
@@ -177,11 +186,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
    */
   private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    if (tableType == TableType.OFFLINE) {
-      context._offlineTableCount++;
-    } else {
-      context._realTimeTableCount++;
-    }
 
     IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
 
@@ -386,6 +390,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     private boolean _logDisabledTables;
     private int _realTimeTableCount;
     private int _offlineTableCount;
+    private int _upsertTableCount;
     private Set<String> _processedTables = new HashSet<>();
     private Set<String> _disabledTables = new HashSet<>();
     private Set<String> _pausedTables = new HashSet<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 6b558b93e7..03a2b6a000 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -75,11 +78,18 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
 
   private String _currentTable = DEFAULT_TABLE_NAME;
 
+  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
   @Override
   protected String getTableName() {
     return _currentTable;
   }
 
+  @Override
+  protected String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
   @Override
   protected int getNumReplicas() {
     return NUM_REPLICAS;
@@ -182,6 +192,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
     String emptyTable = "emptyTable";
     String disabledTable = "disabledTable";
     String tableWithOfflineSegment = "tableWithOfflineSegment";
+    String upsertTable = "upsertTable";
 
     Schema schema = createSchema();
     _currentTable = emptyTable;
@@ -195,6 +206,11 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
     addTableConfig(createOfflineTableConfig());
     _helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false);
 
+    _currentTable = upsertTable;
+    _schemaFileName = UpsertTableIntegrationTest.UPSERT_SCHEMA_FILE_NAME;
+    setupUpsertTable();
+    _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
     _currentTable = tableWithOfflineSegment;
     schema.setSchemaName(_currentTable);
     addSchema(schema);
@@ -211,7 +227,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
 
     _currentTable = DEFAULT_TABLE_NAME;
 
-    int numTables = 5;
+    int numTables = 6;
     ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
     TestUtils.waitForCondition(aVoid -> {
       if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, "SegmentStatusChecker",
@@ -246,8 +262,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
         return false;
       }
       return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.OFFLINE_TABLE_COUNT) == 4
-          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 1
-          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1;
+          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 2
+          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1
+          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.UPSERT_TABLE_COUNT) == 1;
     }, 60_000, "Timed out waiting for SegmentStatusChecker");
 
     dropOfflineTable(emptyTable);
@@ -258,6 +275,19 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
     deleteSchema(tableWithOfflineSegment);
   }
 
+  private void setupUpsertTable()
+      throws IOException {
+    Schema upsertSchema = createSchema();
+    upsertSchema.setSchemaName(getTableName());
+    upsertSchema.getDateTimeFieldSpecs().get(0).setName(UpsertTableIntegrationTest.TIME_COL_NAME);
+    addSchema(upsertSchema);
+    TableConfig tableConfig =
+        createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), getNumKafkaPartitions(), new HashMap<>(),
+            new UpsertConfig(UpsertConfig.Mode.FULL), UpsertTableIntegrationTest.PRIMARY_KEY_COL);
+    tableConfig.getValidationConfig().setTimeColumnName(UpsertTableIntegrationTest.TIME_COL_NAME);
+    addTableConfig(tableConfig);
+  }
+
   private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType,
       IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState,
       long expectedPercentSegmentsAvailable) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 342a8b01f3..238d515b54 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -82,8 +82,10 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
   private static final String CSV_DELIMITER = ",";
   private static final String TABLE_NAME = "gameScores";
   private static final int NUM_SERVERS = 2;
-  private static final String PRIMARY_KEY_COL = "playerId";
   private static final String DELETE_COL = "deleted";
+  public static final String PRIMARY_KEY_COL = "playerId";
+  public static final String TIME_COL_NAME = "timestampInEpoch";
+  public static final String UPSERT_SCHEMA_FILE_NAME = "upsert_table_test.schema";
 
   protected PinotTaskManager _taskManager;
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
@@ -133,13 +135,13 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
 
   @Override
   protected String getSchemaFileName() {
-    return "upsert_table_test.schema";
+    return UPSERT_SCHEMA_FILE_NAME;
   }
 
   @Nullable
   @Override
   protected String getTimeColumnName() {
-    return "timestampInEpoch";
+    return TIME_COL_NAME;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org