You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/10/05 16:22:45 UTC
[pinot] branch master updated: allow to config task expire time (#9530)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c79d029384 allow to config task expire time (#9530)
c79d029384 is described below
commit c79d029384c25241a4ee114fd5472cd6b4cb6d47
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Oct 5 09:20:31 2022 -0700
allow to config task expire time (#9530)
* allow to config task expire time, i.e. how often to purge ended tasks from ZK
* comment the config for clarity
---
.../org/apache/pinot/controller/BaseControllerStarter.java | 3 ++-
.../main/java/org/apache/pinot/controller/ControllerConf.java | 8 ++++++++
.../helix/core/minion/PinotHelixTaskResourceManager.java | 10 +++++++++-
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 9ba042bbcb..697c8f2a06 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -402,7 +402,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
LOGGER.info("Starting task resource manager");
_helixTaskResourceManager =
- new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager));
+ new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
+ _config.getPinotTaskExpireTimeInMs());
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index a3ae9316eb..9b1081d6d4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -148,6 +148,10 @@ public class ControllerConf extends PinotConfiguration {
"controller.minion.task.metrics.emitter.frequencyPeriod";
public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
+ // This is the expiry for the ended tasks. Helix cleans up the task info from ZK after the expiry time from the
+ // end of the task.
+ public static final String PINOT_TASK_EXPIRE_TIME_MS = "controller.task.expire.time.ms";
+
@Deprecated
// RealtimeSegmentRelocator has been rebranded as SegmentRelocator
public static final String DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
@@ -825,6 +829,10 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, false);
}
+ public long getPinotTaskExpireTimeInMs() {
+ return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS, TimeUnit.HOURS.toMillis(24));
+ }
+
/**
* RealtimeSegmentRelocator has been rebranded to SegmentRelocator.
* Check for SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS property, if not found, return
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 85b913c471..4e5c7e5963 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -83,10 +83,17 @@ public class PinotHelixTaskResourceManager {
private final TaskDriver _taskDriver;
private final PinotHelixResourceManager _helixResourceManager;
+ private final long _taskExpireTimeMs;
public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver) {
+ this(helixResourceManager, taskDriver, TimeUnit.HOURS.toMillis(24));
+ }
+
+ public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver,
+ long taskExpireTimeMs) {
_helixResourceManager = helixResourceManager;
_taskDriver = taskDriver;
+ _taskExpireTimeMs = taskExpireTimeMs;
}
/**
@@ -288,7 +295,8 @@ public class PinotHelixTaskResourceManager {
JobConfig.Builder jobBuilder =
new JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
.setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
- .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
+ .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE)
+ .setExpiry(_taskExpireTimeMs);
_taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, jobBuilder);
// Wait until task state is available
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org