You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/10/05 16:22:45 UTC

[pinot] branch master updated: allow to config task expire time (#9530)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c79d029384 allow to config task expire time (#9530)
c79d029384 is described below

commit c79d029384c25241a4ee114fd5472cd6b4cb6d47
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Oct 5 09:20:31 2022 -0700

    allow to config task expire time (#9530)
    
    * allow to config task expire time, i.e. how often to purge ended tasks from ZK
    
    * comment the config for clarity
---
 .../org/apache/pinot/controller/BaseControllerStarter.java     |  3 ++-
 .../main/java/org/apache/pinot/controller/ControllerConf.java  |  8 ++++++++
 .../helix/core/minion/PinotHelixTaskResourceManager.java       | 10 +++++++++-
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 9ba042bbcb..697c8f2a06 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -402,7 +402,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
 
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager =
-        new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager));
+        new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
+            _config.getPinotTaskExpireTimeInMs());
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index a3ae9316eb..9b1081d6d4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -148,6 +148,10 @@ public class ControllerConf extends PinotConfiguration {
         "controller.minion.task.metrics.emitter.frequencyPeriod";
 
     public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
+    // This is the expiry for the ended tasks. Helix cleans up the task info from ZK after the expiry time from the
+    // end of the task.
+    public static final String PINOT_TASK_EXPIRE_TIME_MS = "controller.task.expire.time.ms";
+
     @Deprecated
     // RealtimeSegmentRelocator has been rebranded as SegmentRelocator
     public static final String DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
@@ -825,6 +829,10 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, false);
   }
 
+  public long getPinotTaskExpireTimeInMs() {
+    return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS, TimeUnit.HOURS.toMillis(24));
+  }
+
   /**
    * RealtimeSegmentRelocator has been rebranded to SegmentRelocator.
    * Check for SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS property, if not found, return
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 85b913c471..4e5c7e5963 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -83,10 +83,17 @@ public class PinotHelixTaskResourceManager {
 
   private final TaskDriver _taskDriver;
   private final PinotHelixResourceManager _helixResourceManager;
+  private final long _taskExpireTimeMs;
 
   public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver) {
+    this(helixResourceManager, taskDriver, TimeUnit.HOURS.toMillis(24));
+  }
+
+  public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver,
+      long taskExpireTimeMs) {
     _helixResourceManager = helixResourceManager;
     _taskDriver = taskDriver;
+    _taskExpireTimeMs = taskExpireTimeMs;
   }
 
   /**
@@ -288,7 +295,8 @@ public class PinotHelixTaskResourceManager {
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
             .setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
-            .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
+            .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE)
+            .setExpiry(_taskExpireTimeMs);
     _taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, jobBuilder);
 
     // Wait until task state is available


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org