You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/20 01:49:44 UTC

[incubator-pinot] branch start_stop_periodic_tasks_on_leadership_changes created (now 037cbc0)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a change to branch start_stop_periodic_tasks_on_leadership_changes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 037cbc0  Start and stop ControllerPeriodicTasks based on leadership changes

This branch includes the following new commits:

     new 037cbc0  Start and stop ControllerPeriodicTasks based on leadership changes

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Start and stop ControllerPeriodicTasks based on leadership changes

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch start_stop_periodic_tasks_on_leadership_changes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 037cbc08ab9c9b86d0233311da0312d30b3462da
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Dec 19 17:49:25 2018 -0800

    Start and stop ControllerPeriodicTasks based on leadership changes
---
 .../pinot/controller/ControllerStarter.java        |  9 ++-
 .../controller/helix/SegmentStatusChecker.java     | 12 +--
 .../helix/core/minion/PinotTaskManager.java        | 25 +++---
 .../core/periodictask/ControllerPeriodicTask.java  | 94 +++++++++++-----------
 .../ControllerPeriodicTaskScheduler.java           | 36 +++++++++
 .../core/relocation/RealtimeSegmentRelocator.java  |  5 ++
 .../helix/core/retention/RetentionManager.java     |  9 ++-
 .../controller/validation/ValidationManager.java   | 12 +--
 .../controller/helix/SegmentStatusCheckerTest.java | 33 +++-----
 .../periodictask/ControllerPeriodicTaskTest.java   | 76 +++++++++--------
 .../helix/core/retention/RetentionManagerTest.java | 16 +---
 .../pinot/core/periodictask/PeriodicTask.java      |  5 ++
 .../core/periodictask/PeriodicTaskScheduler.java   | 23 ++++--
 .../periodictask/PeriodicTaskSchedulerTest.java    | 14 ++++
 14 files changed, 209 insertions(+), 160 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 33fb5ce..1f3b316 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
@@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
 import com.linkedin.pinot.controller.validation.ValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
@@ -80,7 +80,7 @@ public class ControllerStarter {
   private final PinotRealtimeSegmentManager _realtimeSegmentsManager;
   private final SegmentStatusChecker _segmentStatusChecker;
   private final ExecutorService _executorService;
-  private final PeriodicTaskScheduler _periodicTaskScheduler;
+  private final ControllerPeriodicTaskScheduler _periodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
   private ValidationManager _validationManager;
@@ -101,7 +101,7 @@ public class ControllerStarter {
         Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
     _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
     _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
-    _periodicTaskScheduler = new PeriodicTaskScheduler();
+    _periodicTaskScheduler = new ControllerPeriodicTaskScheduler();
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
@@ -183,7 +183,8 @@ public class ControllerStarter {
     periodicTasks.add(_validationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
-    _periodicTaskScheduler.start(periodicTasks);
+    _periodicTaskScheduler.init(periodicTasks);
+
 
     LOGGER.info("Creating rebalance segments factory");
     RebalanceSegmentStrategyFactory.createInstance(helixManager);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 8059d9e..a1550f8 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -84,12 +84,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Resetting table metrics for all the tables.");
-    setStatusToDefault();
-  }
-
-  @Override
   protected void preprocess() {
     _realTimeTableCount = 0;
     _offlineTableCount = 0;
@@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
   }
+
+  @Override
+  public void cleanup() {
+    LOGGER.info("Resetting table metrics for all the tables.");
+    setStatusToDefault();
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 729c75c..76c030f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -93,19 +93,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
     return getTasksScheduled();
   }
 
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
-   */
-  @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
-
   @Override
   protected void preprocess() {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
@@ -163,4 +150,16 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   private Map<String, String> getTasksScheduled() {
     return _tasksScheduled;
   }
+
+  /**
+   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+   */
+  @Override
+  public void cleanup() {
+    LOGGER.info("Perform task cleanups.");
+    // Performs necessary cleanups for each task type.
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index c416d0e..3a9d045 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -16,7 +16,6 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
   public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
 
+  private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
 
-  private boolean _isLeader = false;
+  private volatile boolean _stopPeriodicTask = false;
+  private volatile boolean _periodicTaskInProgress = false;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager) {
@@ -61,61 +63,61 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
 
   @Override
   public void run() {
-    if (!isLeader()) {
-      skipLeaderTask();
-    } else {
-      List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
-      processLeaderTask(allTableNames);
-    }
-  }
-
-  private void skipLeaderTask() {
-    if (_isLeader) {
-      LOGGER.info("Current pinot controller lost leadership.");
-      _isLeader = false;
-      onBecomeNotLeader();
-    }
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
-  }
-
-  private void processLeaderTask(List<String> tables) {
-    if (!_isLeader) {
-      LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
-          _taskName, _intervalInSeconds);
-      _isLeader = true;
-      onBecomeLeader();
-    }
+    _periodicTaskInProgress = true;
+    List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables();
     long startTime = System.currentTimeMillis();
-    int numTables = tables.size();
+    int numTables = tableNamesWithType.size();
     LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
-    process(tables);
+    process(tableNamesWithType);
     LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
         (System.currentTimeMillis() - startTime));
+    _periodicTaskInProgress = false;
   }
 
-  /**
-   * Does the following logic when losing the leadership. This should be done only once during leadership transition.
-   */
-  public void onBecomeNotLeader() {
-  }
 
-  /**
-   * Does the following logic when becoming lead controller. This should be done only once during leadership transition.
-   */
-  public void onBecomeLeader() {
+  @Override
+  public void stop() {
+    _stopPeriodicTask = true;
+
+    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", _taskName,
+        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+    while (_periodicTaskInProgress && millisToWait > 0) {
+      try {
+        long thisWait = 1000;
+        if (millisToWait < thisWait) {
+          thisWait = millisToWait;
+        }
+        Thread.sleep(thisWait);
+        millisToWait -= thisWait;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted: Remaining wait time {} (out of {})", millisToWait,
+            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+        break;
+      }
+    }
+    LOGGER.info("Wait completed. _periodicTaskInProgress = {}", _periodicTaskInProgress);
+
+    cleanup();
   }
 
+
   /**
    * Processes the task on the given tables.
    *
-   * @param tables List of table names
+   * @param tableNamesWithType List of table names
    */
-  protected void process(List<String> tables) {
-    preprocess();
-    for (String table : tables) {
-      processTable(table);
+  protected void process(List<String> tableNamesWithType) {
+    if (!isStopPeriodicTask()) {
+      preprocess();
+      for (String table : tableNamesWithType) {
+        if (isStopPeriodicTask()) {
+          break;
+        }
+        processTable(table);
+      }
+      postprocess();
     }
-    postprocess();
   }
 
   /**
@@ -135,7 +137,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   protected abstract void postprocess();
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+  protected boolean isStopPeriodicTask() {
+    return _stopPeriodicTask;
   }
+
+  protected abstract void cleanup();
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
new file mode 100644
index 0000000..15b18c0
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -0,0 +1,36 @@
+package com.linkedin.pinot.controller.helix.core.periodictask;
+
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.core.periodictask.PeriodicTask;
+import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
+import java.util.List;
+
+
+/**
+ * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} which are created on controller startup
+ * and started/stopped on controller leadership changes
+ */
+public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber {
+
+  private List<PeriodicTask> _controllerPeriodicTasks;
+
+  /**
+   * Initialize the {@link ControllerPeriodicTaskScheduler} with the {@link ControllerPeriodicTask} created at startup
+   * @param controllerPeriodicTasks
+   */
+  public void init(List<PeriodicTask> controllerPeriodicTasks) {
+    _controllerPeriodicTasks = controllerPeriodicTasks;
+    ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+  }
+
+  @Override
+  public void onBecomingLeader() {
+    start(_controllerPeriodicTasks);
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    stop();
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index e324c11..90c9435 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -269,4 +269,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
     }
     return seconds;
   }
+
+  @Override
+  public void cleanup() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 80894fc..5127835 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -53,10 +53,7 @@ public class RetentionManager extends ControllerPeriodicTask {
       int deletedSegmentsRetentionInDays) {
     super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
-  }
 
-  @Override
-  public void onBecomeLeader() {
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
   }
@@ -185,4 +182,10 @@ public class RetentionManager extends ControllerPeriodicTask {
           CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
     }
   }
+
+
+  @Override
+  public void cleanup() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 18025eb..79a9d1b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Unregister all the validation metrics.");
-    _validationMetrics.unregisterAllMetrics();
-  }
-
-  @Override
   protected void preprocess() {
     // Run segment level validation using a separate interval
     _runSegmentLevelValidation = false;
@@ -312,4 +306,10 @@ public class ValidationManager extends ControllerPeriodicTask {
 
     return numTotalDocs;
   }
+
+  @Override
+  public void cleanup() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 4d4c324..1246991 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
-
-  private class MockSegmentStatusChecker extends SegmentStatusChecker {
-
-    public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics metricsRegistry) {
-      super(pinotHelixResourceManager, config, metricsRegistry);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 94c9f52..c4903f7 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -28,33 +28,32 @@ public class ControllerPeriodicTaskTest {
   private static final long RUN_FREQUENCY_IN_SECONDS = 30;
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
-  private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
-  private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _cleanupCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
+  private final AtomicBoolean _processTableCalled = new AtomicBoolean();
 
   private final MockControllerPeriodicTask _task =
       new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
-        @Override
-        public void onBecomeLeader() {
-          _onBecomeLeaderCalled.set(true);
-        }
 
         @Override
-        public void onBecomeNotLeader() {
-          _onBecomeNonLeaderCalled.set(true);
+        public void cleanup() {
+          _cleanupCalled.set(true);
         }
 
         @Override
-        public void process(List<String> tables) {
+        public void process(List<String> tableNamesWithType) {
           _processCalled.set(true);
         }
 
+        @Override
+        public void processTable(String tableNameWithType) { _processTableCalled.set(true);}
+
       };
 
   private void resetState() {
-    _onBecomeLeaderCalled.set(false);
-    _onBecomeNonLeaderCalled.set(false);
+    _cleanupCalled.set(false);
     _processCalled.set(false);
+    _processTableCalled.set(false);
   }
 
   @Test
@@ -66,56 +65,48 @@ public class ControllerPeriodicTaskTest {
   }
 
   @Test
-  public void testChangeLeadership() {
+  public void testControllerPeriodicTaskCalls() {
     // Initial state
     resetState();
-    _task.setLeader(false);
     _task.init();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
-    assertFalse(_processCalled.get());
-
-    // From non-leader to non-leader
-    resetState();
-    _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertFalse(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From non-leader to leader
+    // run task
     resetState();
-    _task.setLeader(true);
     _task.run();
-    assertTrue(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertTrue(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From leader to leader
+    // stop periodic task flag set, task will not run
     resetState();
+    _task.setStopPeriodicTask(true);
     _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertTrue(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From leader to non-leader
+    // stop periodic task
     resetState();
-    _task.setLeader(false);
-    _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertTrue(_onBecomeNonLeaderCalled.get());
+    _task.stop();
+    assertTrue(_cleanupCalled.get());
     assertFalse(_processCalled.get());
+    assertFalse(_processTableCalled.get());
+
   }
 
   private class MockControllerPeriodicTask extends ControllerPeriodicTask {
 
-    private boolean _isLeader = true;
+    private boolean _isStopPeriodicTask = false;
     public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
         PinotHelixResourceManager pinotHelixResourceManager) {
       super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
     }
 
     @Override
-    protected void process(List<String> tables) {
+    protected void process(List<String> tableNamesWithType) {
 
     }
 
@@ -135,12 +126,17 @@ public class ControllerPeriodicTaskTest {
     }
 
     @Override
-    protected boolean isLeader() {
-      return _isLeader;
+    protected boolean isStopPeriodicTask() {
+      return _isStopPeriodicTask;
     }
 
-    void setLeader(boolean isLeader) {
-      _isLeader = isLeader;
+    void setStopPeriodicTask(boolean isStopPeriodicTask) {
+      _isStopPeriodicTask = isStopPeriodicTask;
+    }
+
+    @Override
+    public void cleanup() {
+
     }
   }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 8e25505..c27b0ef 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -201,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,16 +306,4 @@ public class RetentionManagerTest {
     return segmentMetadata;
   }
 
-  private class MockRetentionManager extends RetentionManager {
-
-    public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-        int deletedSegmentsRetentionInDays) {
-      super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
index fac0750..f5b9c60 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
@@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable {
    * @return task name.
    */
   String getTaskName();
+
+  /**
+   * Stop the periodic task
+   */
+  void stop();
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index cd07ea4..81c92a0 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -31,6 +31,7 @@ public class PeriodicTaskScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
 
   private ScheduledExecutorService _executorService;
+  private List<PeriodicTask> _tasksWithValidInterval;
 
   /**
    * Start scheduling periodic tasks.
@@ -40,25 +41,27 @@ public class PeriodicTaskScheduler {
       LOGGER.warn("Periodic task scheduler already started");
     }
 
-    List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+    _tasksWithValidInterval = new ArrayList<>();
     for (PeriodicTask periodicTask : periodicTasks) {
       if (periodicTask.getIntervalInSeconds() > 0) {
         LOGGER.info("Adding periodic task: {}", periodicTask);
-        tasksWithValidInterval.add(periodicTask);
+        _tasksWithValidInterval.add(periodicTask);
       } else {
         LOGGER.info("Skipping periodic task: {}", periodicTask);
       }
     }
 
-    if (tasksWithValidInterval.isEmpty()) {
+    if (_tasksWithValidInterval.isEmpty()) {
       LOGGER.warn("No periodic task scheduled");
     } else {
-      LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
-      _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
-      for (PeriodicTask periodicTask : tasksWithValidInterval) {
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval);
+      _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
+      for (PeriodicTask periodicTask : _tasksWithValidInterval) {
         periodicTask.init();
         _executorService.scheduleWithFixedDelay(() -> {
           try {
+            LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(),
+                periodicTask.getIntervalInSeconds());
             periodicTask.run();
           } catch (Throwable e) {
             // catch all errors to prevent subsequent executions from being silently suppressed
@@ -70,11 +73,19 @@ public class PeriodicTaskScheduler {
     }
   }
 
+  /**
+   * Shutdown executor service and stop the periodic tasks
+   */
   public void stop() {
     if (_executorService != null) {
       LOGGER.info("Stopping periodic task scheduler");
       _executorService.shutdown();
       _executorService = null;
     }
+
+    if (_tasksWithValidInterval != null) {
+      LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
+      _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+    }
   }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a105435..fbb3d92 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest {
   public void testTaskWithInvalidInterval() throws Exception {
     AtomicBoolean initCalled = new AtomicBoolean();
     AtomicBoolean runCalled = new AtomicBoolean();
+    AtomicBoolean stopCalled = new AtomicBoolean();
 
     List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
       @Override
@@ -39,6 +40,11 @@ public class PeriodicTaskSchedulerTest {
       }
 
       @Override
+      public void stop() {
+        stopCalled.set(true);
+      }
+
+      @Override
       public void run() {
         runCalled.set(true);
       }
@@ -51,6 +57,7 @@ public class PeriodicTaskSchedulerTest {
 
     assertFalse(initCalled.get());
     assertFalse(runCalled.get());
+    assertFalse(stopCalled.get());
   }
 
   @Test
@@ -58,6 +65,7 @@ public class PeriodicTaskSchedulerTest {
     int numTasks = 3;
     AtomicInteger numTimesInitCalled = new AtomicInteger();
     AtomicInteger numTimesRunCalled = new AtomicInteger();
+    AtomicInteger numTimesStopCalled = new AtomicInteger();
 
     List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
     for (int i = 0; i < numTasks; i++) {
@@ -68,6 +76,11 @@ public class PeriodicTaskSchedulerTest {
         }
 
         @Override
+        public void stop() {
+          numTimesStopCalled.getAndIncrement();
+        }
+
+        @Override
         public void run() {
           numTimesRunCalled.getAndIncrement();
         }
@@ -81,5 +94,6 @@ public class PeriodicTaskSchedulerTest {
 
     assertEquals(numTimesInitCalled.get(), numTasks);
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
+    assertEquals(numTimesStopCalled.get(), numTasks);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org