You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/20 01:49:45 UTC

[incubator-pinot] 01/01: Start and stop ControllerPeriodicTasks based on leadership changes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch start_stop_periodic_tasks_on_leadership_changes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 037cbc08ab9c9b86d0233311da0312d30b3462da
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Dec 19 17:49:25 2018 -0800

    Start and stop ControllerPeriodicTasks based on leadership changes
---
 .../pinot/controller/ControllerStarter.java        |  9 ++-
 .../controller/helix/SegmentStatusChecker.java     | 12 +--
 .../helix/core/minion/PinotTaskManager.java        | 25 +++---
 .../core/periodictask/ControllerPeriodicTask.java  | 94 +++++++++++-----------
 .../ControllerPeriodicTaskScheduler.java           | 36 +++++++++
 .../core/relocation/RealtimeSegmentRelocator.java  |  5 ++
 .../helix/core/retention/RetentionManager.java     |  9 ++-
 .../controller/validation/ValidationManager.java   | 12 +--
 .../controller/helix/SegmentStatusCheckerTest.java | 33 +++-----
 .../periodictask/ControllerPeriodicTaskTest.java   | 76 +++++++++--------
 .../helix/core/retention/RetentionManagerTest.java | 16 +---
 .../pinot/core/periodictask/PeriodicTask.java      |  5 ++
 .../core/periodictask/PeriodicTaskScheduler.java   | 23 ++++--
 .../periodictask/PeriodicTaskSchedulerTest.java    | 14 ++++
 14 files changed, 209 insertions(+), 160 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 33fb5ce..1f3b316 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
@@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
 import com.linkedin.pinot.controller.validation.ValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
@@ -80,7 +80,7 @@ public class ControllerStarter {
   private final PinotRealtimeSegmentManager _realtimeSegmentsManager;
   private final SegmentStatusChecker _segmentStatusChecker;
   private final ExecutorService _executorService;
-  private final PeriodicTaskScheduler _periodicTaskScheduler;
+  private final ControllerPeriodicTaskScheduler _periodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
   private ValidationManager _validationManager;
@@ -101,7 +101,7 @@ public class ControllerStarter {
         Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
     _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
     _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
-    _periodicTaskScheduler = new PeriodicTaskScheduler();
+    _periodicTaskScheduler = new ControllerPeriodicTaskScheduler();
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
@@ -183,7 +183,8 @@ public class ControllerStarter {
     periodicTasks.add(_validationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
-    _periodicTaskScheduler.start(periodicTasks);
+    _periodicTaskScheduler.init(periodicTasks);
+
 
     LOGGER.info("Creating rebalance segments factory");
     RebalanceSegmentStrategyFactory.createInstance(helixManager);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 8059d9e..a1550f8 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -84,12 +84,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Resetting table metrics for all the tables.");
-    setStatusToDefault();
-  }
-
-  @Override
   protected void preprocess() {
     _realTimeTableCount = 0;
     _offlineTableCount = 0;
@@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
   }
+
+  @Override
+  public void cleanup() {
+    LOGGER.info("Resetting table metrics for all the tables.");
+    setStatusToDefault();
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 729c75c..76c030f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -93,19 +93,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
     return getTasksScheduled();
   }
 
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
-   */
-  @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
-
   @Override
   protected void preprocess() {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
@@ -163,4 +150,16 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   private Map<String, String> getTasksScheduled() {
     return _tasksScheduled;
   }
+
+  /**
+   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+   */
+  @Override
+  public void cleanup() {
+    LOGGER.info("Perform task cleanups.");
+    // Performs necessary cleanups for each task type.
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index c416d0e..3a9d045 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -16,7 +16,6 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
   public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
 
+  private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
 
-  private boolean _isLeader = false;
+  private volatile boolean _stopPeriodicTask = false;
+  private volatile boolean _periodicTaskInProgress = false;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager) {
@@ -61,61 +63,61 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
 
   @Override
   public void run() {
-    if (!isLeader()) {
-      skipLeaderTask();
-    } else {
-      List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
-      processLeaderTask(allTableNames);
-    }
-  }
-
-  private void skipLeaderTask() {
-    if (_isLeader) {
-      LOGGER.info("Current pinot controller lost leadership.");
-      _isLeader = false;
-      onBecomeNotLeader();
-    }
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
-  }
-
-  private void processLeaderTask(List<String> tables) {
-    if (!_isLeader) {
-      LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
-          _taskName, _intervalInSeconds);
-      _isLeader = true;
-      onBecomeLeader();
-    }
+    _periodicTaskInProgress = true;
+    List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables();
     long startTime = System.currentTimeMillis();
-    int numTables = tables.size();
+    int numTables = tableNamesWithType.size();
     LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
-    process(tables);
+    process(tableNamesWithType);
     LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
         (System.currentTimeMillis() - startTime));
+    _periodicTaskInProgress = false;
   }
 
-  /**
-   * Does the following logic when losing the leadership. This should be done only once during leadership transition.
-   */
-  public void onBecomeNotLeader() {
-  }
 
-  /**
-   * Does the following logic when becoming lead controller. This should be done only once during leadership transition.
-   */
-  public void onBecomeLeader() {
+  @Override
+  public void stop() {
+    _stopPeriodicTask = true;
+
+    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", _taskName,
+        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+    while (_periodicTaskInProgress && millisToWait > 0) {
+      try {
+        long thisWait = 1000;
+        if (millisToWait < thisWait) {
+          thisWait = millisToWait;
+        }
+        Thread.sleep(thisWait);
+        millisToWait -= thisWait;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted: Remaining wait time {} (out of {})", millisToWait,
+            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+        break;
+      }
+    }
+    LOGGER.info("Wait completed. _periodicTaskInProgress = {}", _periodicTaskInProgress);
+
+    cleanup();
   }
 
+
   /**
    * Processes the task on the given tables.
    *
-   * @param tables List of table names
+   * @param tableNamesWithType List of table names
    */
-  protected void process(List<String> tables) {
-    preprocess();
-    for (String table : tables) {
-      processTable(table);
+  protected void process(List<String> tableNamesWithType) {
+    if (!isStopPeriodicTask()) {
+      preprocess();
+      for (String table : tableNamesWithType) {
+        if (isStopPeriodicTask()) {
+          break;
+        }
+        processTable(table);
+      }
+      postprocess();
     }
-    postprocess();
   }
 
   /**
@@ -135,7 +137,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   protected abstract void postprocess();
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+  protected boolean isStopPeriodicTask() {
+    return _stopPeriodicTask;
   }
+
+  protected abstract void cleanup();
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
new file mode 100644
index 0000000..15b18c0
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -0,0 +1,36 @@
+package com.linkedin.pinot.controller.helix.core.periodictask;
+
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.core.periodictask.PeriodicTask;
+import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
+import java.util.List;
+
+
+/**
+ * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} which are created on controller startup
+ * and started/stopped on controller leadership changes
+ */
+public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber {
+
+  private List<PeriodicTask> _controllerPeriodicTasks;
+
+  /**
+   * Initialize the {@link ControllerPeriodicTaskScheduler} with the {@link ControllerPeriodicTask} created at startup
+   * @param controllerPeriodicTasks
+   */
+  public void init(List<PeriodicTask> controllerPeriodicTasks) {
+    _controllerPeriodicTasks = controllerPeriodicTasks;
+    ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+  }
+
+  @Override
+  public void onBecomingLeader() {
+    start(_controllerPeriodicTasks);
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    stop();
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index e324c11..90c9435 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -269,4 +269,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
     }
     return seconds;
   }
+
+  @Override
+  public void cleanup() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 80894fc..5127835 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -53,10 +53,7 @@ public class RetentionManager extends ControllerPeriodicTask {
       int deletedSegmentsRetentionInDays) {
     super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
-  }
 
-  @Override
-  public void onBecomeLeader() {
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
   }
@@ -185,4 +182,10 @@ public class RetentionManager extends ControllerPeriodicTask {
           CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
     }
   }
+
+
+  @Override
+  public void cleanup() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 18025eb..79a9d1b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Unregister all the validation metrics.");
-    _validationMetrics.unregisterAllMetrics();
-  }
-
-  @Override
   protected void preprocess() {
     // Run segment level validation using a separate interval
     _runSegmentLevelValidation = false;
@@ -312,4 +306,10 @@ public class ValidationManager extends ControllerPeriodicTask {
 
     return numTotalDocs;
   }
+
+  @Override
+  public void cleanup() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 4d4c324..1246991 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
-
-  private class MockSegmentStatusChecker extends SegmentStatusChecker {
-
-    public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics metricsRegistry) {
-      super(pinotHelixResourceManager, config, metricsRegistry);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 94c9f52..c4903f7 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -28,33 +28,32 @@ public class ControllerPeriodicTaskTest {
   private static final long RUN_FREQUENCY_IN_SECONDS = 30;
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
-  private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
-  private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _cleanupCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
+  private final AtomicBoolean _processTableCalled = new AtomicBoolean();
 
   private final MockControllerPeriodicTask _task =
       new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
-        @Override
-        public void onBecomeLeader() {
-          _onBecomeLeaderCalled.set(true);
-        }
 
         @Override
-        public void onBecomeNotLeader() {
-          _onBecomeNonLeaderCalled.set(true);
+        public void cleanup() {
+          _cleanupCalled.set(true);
         }
 
         @Override
-        public void process(List<String> tables) {
+        public void process(List<String> tableNamesWithType) {
           _processCalled.set(true);
         }
 
+        @Override
+        public void processTable(String tableNameWithType) { _processTableCalled.set(true);}
+
       };
 
   private void resetState() {
-    _onBecomeLeaderCalled.set(false);
-    _onBecomeNonLeaderCalled.set(false);
+    _cleanupCalled.set(false);
     _processCalled.set(false);
+    _processTableCalled.set(false);
   }
 
   @Test
@@ -66,56 +65,48 @@ public class ControllerPeriodicTaskTest {
   }
 
   @Test
-  public void testChangeLeadership() {
+  public void testControllerPeriodicTaskCalls() {
     // Initial state
     resetState();
-    _task.setLeader(false);
     _task.init();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
-    assertFalse(_processCalled.get());
-
-    // From non-leader to non-leader
-    resetState();
-    _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertFalse(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From non-leader to leader
+    // run task
     resetState();
-    _task.setLeader(true);
     _task.run();
-    assertTrue(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertTrue(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From leader to leader
+    // stop periodic task flag set, task will not run
     resetState();
+    _task.setStopPeriodicTask(true);
     _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_cleanupCalled.get());
     assertTrue(_processCalled.get());
+    assertFalse(_processTableCalled.get());
 
-    // From leader to non-leader
+    // stop periodic task
     resetState();
-    _task.setLeader(false);
-    _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertTrue(_onBecomeNonLeaderCalled.get());
+    _task.stop();
+    assertTrue(_cleanupCalled.get());
     assertFalse(_processCalled.get());
+    assertFalse(_processTableCalled.get());
+
   }
 
   private class MockControllerPeriodicTask extends ControllerPeriodicTask {
 
-    private boolean _isLeader = true;
+    private boolean _isStopPeriodicTask = false;
     public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
         PinotHelixResourceManager pinotHelixResourceManager) {
       super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
     }
 
     @Override
-    protected void process(List<String> tables) {
+    protected void process(List<String> tableNamesWithType) {
 
     }
 
@@ -135,12 +126,17 @@ public class ControllerPeriodicTaskTest {
     }
 
     @Override
-    protected boolean isLeader() {
-      return _isLeader;
+    protected boolean isStopPeriodicTask() {
+      return _isStopPeriodicTask;
     }
 
-    void setLeader(boolean isLeader) {
-      _isLeader = isLeader;
+    void setStopPeriodicTask(boolean isStopPeriodicTask) {
+      _isStopPeriodicTask = isStopPeriodicTask;
+    }
+
+    @Override
+    public void cleanup() {
+
     }
   }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 8e25505..c27b0ef 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -201,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,16 +306,4 @@ public class RetentionManagerTest {
     return segmentMetadata;
   }
 
-  private class MockRetentionManager extends RetentionManager {
-
-    public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-        int deletedSegmentsRetentionInDays) {
-      super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
index fac0750..f5b9c60 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
@@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable {
    * @return task name.
    */
   String getTaskName();
+
+  /**
+   * Stop the periodic task
+   */
+  void stop();
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index cd07ea4..81c92a0 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -31,6 +31,7 @@ public class PeriodicTaskScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
 
   private ScheduledExecutorService _executorService;
+  private List<PeriodicTask> _tasksWithValidInterval;
 
   /**
    * Start scheduling periodic tasks.
@@ -40,25 +41,27 @@ public class PeriodicTaskScheduler {
       LOGGER.warn("Periodic task scheduler already started");
     }
 
-    List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+    _tasksWithValidInterval = new ArrayList<>();
     for (PeriodicTask periodicTask : periodicTasks) {
       if (periodicTask.getIntervalInSeconds() > 0) {
         LOGGER.info("Adding periodic task: {}", periodicTask);
-        tasksWithValidInterval.add(periodicTask);
+        _tasksWithValidInterval.add(periodicTask);
       } else {
         LOGGER.info("Skipping periodic task: {}", periodicTask);
       }
     }
 
-    if (tasksWithValidInterval.isEmpty()) {
+    if (_tasksWithValidInterval.isEmpty()) {
       LOGGER.warn("No periodic task scheduled");
     } else {
-      LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
-      _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
-      for (PeriodicTask periodicTask : tasksWithValidInterval) {
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval);
+      _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
+      for (PeriodicTask periodicTask : _tasksWithValidInterval) {
         periodicTask.init();
         _executorService.scheduleWithFixedDelay(() -> {
           try {
+            LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(),
+                periodicTask.getIntervalInSeconds());
             periodicTask.run();
           } catch (Throwable e) {
             // catch all errors to prevent subsequent executions from being silently suppressed
@@ -70,11 +73,19 @@ public class PeriodicTaskScheduler {
     }
   }
 
+  /**
+   * Shutdown executor service and stop the periodic tasks
+   */
   public void stop() {
     if (_executorService != null) {
       LOGGER.info("Stopping periodic task scheduler");
       _executorService.shutdown();
       _executorService = null;
     }
+
+    if (_tasksWithValidInterval != null) {
+      LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
+      _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+    }
   }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a105435..fbb3d92 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest {
   public void testTaskWithInvalidInterval() throws Exception {
     AtomicBoolean initCalled = new AtomicBoolean();
     AtomicBoolean runCalled = new AtomicBoolean();
+    AtomicBoolean stopCalled = new AtomicBoolean();
 
     List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
       @Override
@@ -39,6 +40,11 @@ public class PeriodicTaskSchedulerTest {
       }
 
       @Override
+      public void stop() {
+        stopCalled.set(true);
+      }
+
+      @Override
       public void run() {
         runCalled.set(true);
       }
@@ -51,6 +57,7 @@ public class PeriodicTaskSchedulerTest {
 
     assertFalse(initCalled.get());
     assertFalse(runCalled.get());
+    assertFalse(stopCalled.get());
   }
 
   @Test
@@ -58,6 +65,7 @@ public class PeriodicTaskSchedulerTest {
     int numTasks = 3;
     AtomicInteger numTimesInitCalled = new AtomicInteger();
     AtomicInteger numTimesRunCalled = new AtomicInteger();
+    AtomicInteger numTimesStopCalled = new AtomicInteger();
 
     List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
     for (int i = 0; i < numTasks; i++) {
@@ -68,6 +76,11 @@ public class PeriodicTaskSchedulerTest {
         }
 
         @Override
+        public void stop() {
+          numTimesStopCalled.getAndIncrement();
+        }
+
+        @Override
         public void run() {
           numTimesRunCalled.getAndIncrement();
         }
@@ -81,5 +94,6 @@ public class PeriodicTaskSchedulerTest {
 
     assertEquals(numTimesInitCalled.get(), numTasks);
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
+    assertEquals(numTimesStopCalled.get(), numTasks);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org