You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 16:59:43 UTC
(pinot) branch master updated: making nonLeaderForTables exhaustive (#12345)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76379fb86a making nonLeaderForTables exhaustive (#12345)
76379fb86a is described below
commit 76379fb86a2810301a5e79113d75437127bb6ccf
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Thu Feb 1 17:59:36 2024 +0100
making nonLeaderForTables exhaustive (#12345)
---
.../controller/helix/SegmentStatusChecker.java | 9 +---
.../core/periodictask/ControllerPeriodicTask.java | 59 ++++++++++++----------
2 files changed, 33 insertions(+), 35 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..f4121506a1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -77,8 +77,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
private TableSizeReader _tableSizeReader;
- private Set<String> _cachedTableNamesWithType = new HashSet<>();
-
/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
@@ -152,12 +150,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_DISABLED, 0);
}
});
-
- // Remove metrics for tables that are no longer in the cluster
- _cachedTableNamesWithType.removeAll(context._processedTables);
- _cachedTableNamesWithType.forEach(this::removeMetricsForTable);
- _cachedTableNamesWithType.clear();
- _cachedTableNamesWithType.addAll(context._processedTables);
}
/**
@@ -350,6 +342,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
}
private void removeMetricsForTable(String tableNameWithType) {
+ LOGGER.info("Removing metrics from {} given it is not a table known by Helix", tableNameWithType);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3067de8268..6761efde96 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.periodictask;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -46,6 +51,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
protected final PinotHelixResourceManager _pinotHelixResourceManager;
protected final LeadControllerManager _leadControllerManager;
protected final ControllerMetrics _controllerMetrics;
+ protected Set<String> _prevLeaderOfTables = new HashSet<>();
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
@@ -63,30 +69,23 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
// Check if we have a specific table against which this task needs to be run.
String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
// Process the tables that are managed by this controller
- List<String> tablesToProcess = new ArrayList<>();
- List<String> nonLeaderForTables = new ArrayList<>();
- if (propTableNameWithType == null) {
- // Table name is not available, so task should run on all tables for which this controller is the lead.
- for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
- if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
- tablesToProcess.add(tableNameWithType);
- } else {
- nonLeaderForTables.add(tableNameWithType);
- }
- }
- } else {
- // Table name is available, so task should run only on the specified table.
- if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
- tablesToProcess.add(propTableNameWithType);
- }
- }
+ List<String> allTables = propTableNameWithType == null
+ ? _pinotHelixResourceManager.getAllTables()
+ : Collections.singletonList(propTableNameWithType);
+
+ Set<String> currentLeaderOfTables = allTables.stream()
+ .filter(_leadControllerManager::isLeaderForTable)
+ .collect(Collectors.toSet());
- if (!tablesToProcess.isEmpty()) {
- processTables(tablesToProcess, periodicTaskProperties);
+ if (!currentLeaderOfTables.isEmpty()) {
+ processTables(new ArrayList<>(currentLeaderOfTables), periodicTaskProperties);
}
+
+ Set<String> nonLeaderForTables = Sets.difference(_prevLeaderOfTables, currentLeaderOfTables);
if (!nonLeaderForTables.isEmpty()) {
- nonLeaderCleanup(nonLeaderForTables);
+ nonLeaderCleanup(new ArrayList<>(nonLeaderForTables));
}
+ _prevLeaderOfTables = currentLeaderOfTables;
} catch (Exception e) {
LOGGER.error("Caught exception while running task: {}", _taskName, e);
_controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -98,9 +97,12 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Processes the given list of tables, and returns the number of tables processed.
+ * Processes the given list of tables lead by the current controller, and returns the number of tables processed.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link #processTable(String, C)}.
+ * <p/>
+ * Note: This method is called each time the task is executed <b>if and only if</b> the current controller is the
+ * leader of at least one table. A corollary is that it won't be called every time the task is executed.
*/
protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) {
int numTables = tableNamesWithType.size();
@@ -128,14 +130,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Can be overridden to provide context before processing the tables.
+ * Can be overridden to provide context before processing the tables lead by the current controller.
*/
protected C preprocess(Properties periodicTaskProperties) {
return null;
}
/**
- * Processes the given table.
+ * Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link #processTables(List, Properties)}.
*/
@@ -144,7 +146,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Processes the given table.
+ * Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String, C)} or {@link #processTables(List, Properties)}.
*/
@@ -152,20 +154,23 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Can be overridden to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables lead by the current controller.
*/
protected void postprocess(C context) {
postprocess();
}
/**
- * Can be overridden to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables lead by the current controller.
*/
protected void postprocess() {
}
/**
- * Can be overridden to perform cleanups for tables that the current controller isn't the leader.
+ * Can be overridden to perform cleanups for tables the current controller lost the leadership.
+ * <p/>
+ * Note: This method is only being called when there is at least one table in the given list. A corollary is that it
+ * won't be called every time the task is executed.
*
* @param tableNamesWithType the table names that the current controller isn't the leader for
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org