You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/01 23:42:44 UTC

[GitHub] [pinot] snleee commented on a change in pull request #7368: Add mergeRollupTask delay metrics

snleee commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r720589568



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +493,145 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs;
+  }
+
+  /**
+   * Update the delay metrics for the given table and merge level. We create the new gauge metric if the metric is not
+   * available.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   * @param watermarkMs current watermark value
+   * @param bufferTimeMs buffer time
+   * @param bucketTimeMs bucket time
+   */
+  private void updateDelayMetrics(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Update gauge value that indicates the delay in terms of the number of time buckets.
+    Map<String, Long> watermarkForTable =
+        _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>());
+    watermarkForTable.compute(mergeLevel, (k, v) -> {
+      if (v == null) {
+        LOGGER.info(
+            "Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}."
+                + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType,
+            mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+            getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs));
+        controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel),
+            (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs,
+                bucketTimeMs)));
+      }
+      return watermarkMs;
+    });
+  }
+
+  /**
+   * Reset the delay metrics for the given table name.
+   *
+   * @param tableNameWithType a table name with type
+   */
+  private void resetDelayMetrics(String tableNameWithType) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.remove(tableNameWithType);
+    if (watermarksForTable != null) {
+      for (String mergeLevel : watermarksForTable.keySet()) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Reset the delay metrics for the given table name and merge level.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   */
+  private void resetDelayMetrics(String tableNameWithType, String mergeLevel) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given the table name and the merge level.
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.get(tableNameWithType);
+    if (watermarksForTable != null) {
+      if (watermarksForTable.remove(mergeLevel) != null) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Clean up the metrics that no longer need to be emitted.
+   *
+   * We clean up the metrics for the following cases:
+   *   1. Table got deleted.
+   *   2. The current controller is no longer the leader for a table.
+   *   3. Merge task config got deleted.
+   *   4. Merge task config got modified and some merge levels got deleted.
+   *
+   * TODO: Current code will remove all metrics in case we invoke the ad-hoc task scheduling on a single table.
+   * We will file the follow-up PR to address this issue. We need to separate out APIs for ad-hoc scheduling and
+   * periodic scheduling. We will only enable metrics for periodic case.
+   *
+   * @param tableConfigs list of tables
+   */
+  private void cleanUpDelayMetrics(List<TableConfig> tableConfigs) {
+    Map<String, TableConfig> tableConfigMap = new HashMap<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      tableConfigMap.put(tableConfig.getTableName(), tableConfig);
+    }
+
+    for (String tableNameWithType : new ArrayList<>(_mergeRollupWatermarks.keySet())) {
+      TableConfig currentTableConfig = tableConfigMap.get(tableNameWithType);
+      // Table does not exist in the cluster
+      if (currentTableConfig == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // The current controller is no longer leader for this table
+      if (!_clusterInfoAccessor.getLeaderControllerManager().isLeaderForTable(tableNameWithType)) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // Task config is removed
+      Map<String, String> taskConfigs = currentTableConfig.getTaskConfig().getConfigsForTaskType(getTaskType());
+      if (taskConfigs == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }

Review comment:
       What if the task was enabled so the controller has gauge enabled and then the task config got removed later? I'm trying to cover that case. We call `cleanUpDelayMetrics()` at the end of the scheduling round.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +493,145 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs;
+  }
+
+  /**
+   * Update the delay metrics for the given table and merge level. We create the new gauge metric if the metric is not
+   * available.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   * @param watermarkMs current watermark value
+   * @param bufferTimeMs buffer time
+   * @param bucketTimeMs bucket time
+   */
+  private void updateDelayMetrics(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Update gauge value that indicates the delay in terms of the number of time buckets.
+    Map<String, Long> watermarkForTable =
+        _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>());
+    watermarkForTable.compute(mergeLevel, (k, v) -> {
+      if (v == null) {
+        LOGGER.info(
+            "Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}."
+                + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType,
+            mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+            getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs));
+        controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel),
+            (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs,
+                bucketTimeMs)));
+      }
+      return watermarkMs;
+    });
+  }
+
+  /**
+   * Reset the delay metrics for the given table name.
+   *
+   * @param tableNameWithType a table name with type
+   */
+  private void resetDelayMetrics(String tableNameWithType) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.remove(tableNameWithType);
+    if (watermarksForTable != null) {
+      for (String mergeLevel : watermarksForTable.keySet()) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Reset the delay metrics for the given table name and merge level.
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   */
+  private void resetDelayMetrics(String tableNameWithType, String mergeLevel) {
+    ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given the table name and the merge level.
+    Map<String, Long> watermarksForTable = _mergeRollupWatermarks.get(tableNameWithType);
+    if (watermarksForTable != null) {
+      if (watermarksForTable.remove(mergeLevel) != null) {
+        controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Clean up the metrics that no longer need to be emitted.
+   *
+   * We clean up the metrics for the following cases:
+   *   1. Table got deleted.
+   *   2. The current controller is no longer the leader for a table.
+   *   3. Merge task config got deleted.
+   *   4. Merge task config got modified and some merge levels got deleted.
+   *
+   * TODO: Current code will remove all metrics in case we invoke the ad-hoc task scheduling on a single table.
+   * We will file the follow-up PR to address this issue. We need to separate out APIs for ad-hoc scheduling and
+   * periodic scheduling. We will only enable metrics for periodic case.
+   *
+   * @param tableConfigs list of tables
+   */
+  private void cleanUpDelayMetrics(List<TableConfig> tableConfigs) {
+    Map<String, TableConfig> tableConfigMap = new HashMap<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      tableConfigMap.put(tableConfig.getTableName(), tableConfig);
+    }
+
+    for (String tableNameWithType : new ArrayList<>(_mergeRollupWatermarks.keySet())) {
+      TableConfig currentTableConfig = tableConfigMap.get(tableNameWithType);
+      // Table does not exist in the cluster
+      if (currentTableConfig == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // The current controller is no longer leader for this table
+      if (!_clusterInfoAccessor.getLeaderControllerManager().isLeaderForTable(tableNameWithType)) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }
+
+      // Task config is removed
+      Map<String, String> taskConfigs = currentTableConfig.getTaskConfig().getConfigsForTaskType(getTaskType());
+      if (taskConfigs == null) {
+        resetDelayMetrics(tableNameWithType);
+        continue;
+      }

Review comment:
       What if the task was enabled so the controller has gauge enabled and then the task config got removed later? I'm trying to cover that case. We call `cleanUpDelayMetrics()` at the end of every scheduling round.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org