You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 16:59:43 UTC

(pinot) branch master updated: making nonLeaderForTables exhaustive (#12345)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76379fb86a making nonLeaderForTables exhaustive (#12345)
76379fb86a is described below

commit 76379fb86a2810301a5e79113d75437127bb6ccf
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Thu Feb 1 17:59:36 2024 +0100

    making nonLeaderForTables exhaustive (#12345)
---
 .../controller/helix/SegmentStatusChecker.java     |  9 +---
 .../core/periodictask/ControllerPeriodicTask.java  | 59 ++++++++++++----------
 2 files changed, 33 insertions(+), 35 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..f4121506a1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -77,8 +77,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
 
   private TableSizeReader _tableSizeReader;
 
-  private Set<String> _cachedTableNamesWithType = new HashSet<>();
-
   /**
    * Constructs the segment status checker.
    * @param pinotHelixResourceManager The resource checker used to interact with Helix
@@ -152,12 +150,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
         _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_DISABLED, 0);
       }
     });
-
-    // Remove metrics for tables that are no longer in the cluster
-    _cachedTableNamesWithType.removeAll(context._processedTables);
-    _cachedTableNamesWithType.forEach(this::removeMetricsForTable);
-    _cachedTableNamesWithType.clear();
-    _cachedTableNamesWithType.addAll(context._processedTables);
   }
 
   /**
@@ -350,6 +342,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
   }
 
   private void removeMetricsForTable(String tableNameWithType) {
+    LOGGER.info("Removing metrics from {} given it is not a table known by Helix", tableNameWithType);
     _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS);
     _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS);
     _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3067de8268..6761efde96 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.periodictask;
 
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -46,6 +51,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
   protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
+  protected Set<String> _prevLeaderOfTables = new HashSet<>();
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
@@ -63,30 +69,23 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
       // Check if we have a specific table against which this task needs to be run.
       String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
       // Process the tables that are managed by this controller
-      List<String> tablesToProcess = new ArrayList<>();
-      List<String> nonLeaderForTables = new ArrayList<>();
-      if (propTableNameWithType == null) {
-        // Table name is not available, so task should run on all tables for which this controller is the lead.
-        for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
-          if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
-            tablesToProcess.add(tableNameWithType);
-          } else {
-            nonLeaderForTables.add(tableNameWithType);
-          }
-        }
-      } else {
-        // Table name is available, so task should run only on the specified table.
-        if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
-          tablesToProcess.add(propTableNameWithType);
-        }
-      }
+      List<String> allTables = propTableNameWithType == null
+          ? _pinotHelixResourceManager.getAllTables()
+          : Collections.singletonList(propTableNameWithType);
+
+      Set<String> currentLeaderOfTables = allTables.stream()
+          .filter(_leadControllerManager::isLeaderForTable)
+          .collect(Collectors.toSet());
 
-      if (!tablesToProcess.isEmpty()) {
-        processTables(tablesToProcess, periodicTaskProperties);
+      if (!currentLeaderOfTables.isEmpty()) {
+        processTables(new ArrayList<>(currentLeaderOfTables), periodicTaskProperties);
       }
+
+      Set<String> nonLeaderForTables = Sets.difference(_prevLeaderOfTables, currentLeaderOfTables);
       if (!nonLeaderForTables.isEmpty()) {
-        nonLeaderCleanup(nonLeaderForTables);
+        nonLeaderCleanup(new ArrayList<>(nonLeaderForTables));
       }
+      _prevLeaderOfTables = currentLeaderOfTables;
     } catch (Exception e) {
       LOGGER.error("Caught exception while running task: {}", _taskName, e);
       _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -98,9 +97,12 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Processes the given list of tables, and returns the number of tables processed.
+   * Processes the given list of tables lead by the current controller, and returns the number of tables processed.
    * <p>
    * Override one of this method, {@link #processTable(String)} or {@link #processTable(String, C)}.
+   * <p/>
+   * Note: This method is called each time the task is executed <b>if and only if</b> the current controller is the
+   * leader of at least one table. A corollary is that it won't be called every time the task is executed.
    */
   protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) {
     int numTables = tableNamesWithType.size();
@@ -128,14 +130,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Can be overridden to provide context before processing the tables.
+   * Can be overridden to provide context before processing the tables lead by the current controller.
    */
   protected C preprocess(Properties periodicTaskProperties) {
     return null;
   }
 
   /**
-   * Processes the given table.
+   * Processes the given table lead by the current controller.
    * <p>
    * Override one of this method, {@link #processTable(String)} or {@link #processTables(List, Properties)}.
    */
@@ -144,7 +146,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Processes the given table.
+   * Processes the given table lead by the current controller.
    * <p>
    * Override one of this method, {@link #processTable(String, C)} or {@link #processTables(List, Properties)}.
    */
@@ -152,20 +154,23 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Can be overridden to perform cleanups after processing the tables.
+   * Can be overridden to perform cleanups after processing the tables lead by the current controller.
    */
   protected void postprocess(C context) {
     postprocess();
   }
 
   /**
-   * Can be overridden to perform cleanups after processing the tables.
+   * Can be overridden to perform cleanups after processing the tables lead by the current controller.
    */
   protected void postprocess() {
   }
 
   /**
-   * Can be overridden to perform cleanups for tables that the current controller isn't the leader.
+   * Can be overridden to perform cleanups for tables the current controller lost the leadership.
+   * <p/>
+   * Note: This method is only being called when there is at least one table in the given list. A corollary is that it
+   * won't be called every time the task is executed.
    *
    * @param tableNamesWithType the table names that the current controller isn't the leader for
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org