You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/01 22:47:34 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7368: Add mergeRollupTask delay metrics

Jackie-Jiang commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r720573246



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -276,9 +298,13 @@ public String getTaskType() {
           continue;
         }
 
-        Long prevWatermarkMs = mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);
+        Long prevWatermarkMs = mergeRollupTaskMetadata.getWatermarkMap().get(mergeLevel);
+        mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);

Review comment:
       Please keep the existing code because it gives the same result but more efficient

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +493,145 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs;
+  }
+
+  /**
+   * Update the delay metrics for the given table and merge level. We create the new gauge metric if the metric is not
+   * available.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   * @param watermarkMs current watermark value
+   * @param bufferTimeMs buffer time
+   * @param bucketTimeMs bucket time
+   */
+  private void updateDelayMetrics(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Update gauge value that indicates the delay in terms of the number of time buckets.
+    Map<String, Long> watermarkForTable =
+        _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>());
+    watermarkForTable.compute(mergeLevel, (k, v) -> {
+      if (v == null) {
+        LOGGER.info(
+            "Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}."
+                + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType,
+            mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+            getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs));
+        controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel),
+            (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs,
+                bucketTimeMs)));
+      }
+      return watermarkMs;
+    });
+  }
+
+  /**
+   * Reset the delay metrics for the given table name.
+   *
+   * @param tableNameWithType a table name with type
+   */
+  private void resetDelayMetrics(String tableNameWithType) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.remove(tableNameWithType);
+    if (watermarksForTable != null) {
+      for (String mergeLevel : watermarksForTable.keySet()) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Reset the delay metrics for the given table name and merge level.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   */
+  private void resetDelayMetrics(String tableNameWithType, String mergeLevel) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given the table name and the merge level.
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.get(tableNameWithType);
+    if (watermarksForTable != null) {
+      if (watermarksForTable.remove(mergeLevel) != null) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Clean up the metrics that no longer need to be emitted.
+   *
+   * We clean up the metrics for the following cases:
+   *   1. Table got deleted.
+   *   2. The current controller is no longer the leader for a table.
+   *   3. Merge task config got deleted.
+   *   4. Merge task config got modified and some merge levels got deleted.
+   *
+   * TODO: Current code will remove all metrics in case we invoke the ad-hoc task scheduling on a single table.
+   * We will file the follow-up PR to address this issue. We need to separate out APIs for ad-hoc scheduling and
+   * periodic scheduling. We will only enable metrics for periodic case.
+   *
+   * @param tableConfigs list of tables
+   */
+  private void cleanUpDelayMetrics(List<TableConfig> tableConfigs) {
+    Map<String, TableConfig> tableConfigMap = new HashMap<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      tableConfigMap.put(tableConfig.getTableName(), tableConfig);
+    }
+
+    for (String tableNameWithType : new ArrayList<>(_mergeRollupWatermarks.keySet())) {
+      TableConfig currentTableConfig = tableConfigMap.get(tableNameWithType);
+      // Table does not exist in the cluster
+      if (currentTableConfig == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // The current controller is no longer leader for this table
+      if (!_clusterInfoAccessor.getLeaderControllerManager().isLeaderForTable(tableNameWithType)) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // Task config is removed
+      Map<String, String> taskConfigs = currentTableConfig.getTaskConfig().getConfigsForTaskType(getTaskType());
+      if (taskConfigs == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }

Review comment:
       This is not possible because only the table with task enabled can be passed into this class

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -110,17 +171,23 @@ public String getTaskType() {
   public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     String taskType = MergeRollupTask.TASK_TYPE;
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
-
+    Set<String> candidateMergeTables = new HashSet<>();
     for (TableConfig tableConfig : tableConfigs) {
       if (!validate(tableConfig, taskType)) {
         continue;
       }
       String offlineTableName = tableConfig.getTableName();
       LOGGER.info("Start generating task configs for table: {} for task: {}", offlineTableName, taskType);
+      candidateMergeTables.add(offlineTableName);
 
       // Get all segment metadata
       List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
 
+      // Reset the watermark time if no segment found
+      if (allSegments.isEmpty()) {

Review comment:
       Move `resetDelayMetrics` to line 142 in the old code (within the `if (preSelectedSegments.isEmpty())` block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org