You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2018/11/14 02:39:21 UTC

[incubator-pinot] branch periodic_task created (now b12a7cf)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch periodic_task
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at b12a7cf  Enhance controller periodic task and scheduler

This branch includes the following new commits:

     new b12a7cf  Enhance controller periodic task and scheduler

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Enhance controller periodic task and scheduler

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch periodic_task
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b12a7cf7b6c1fc2715cc0063137dbab79e894257
Author: Xiaotian (Jackie) Jiang <xa...@linkedin.com>
AuthorDate: Tue Nov 13 18:30:02 2018 -0800

    Enhance controller periodic task and scheduler
    
    For ControllerPeriodicTask:
    1. Random pick an initial delay of range 2-5 mins so each task does not start at the same time
    2. Make leadership information private to the class
    For PeriodicTaskScheduler:
    1. Filter out tasks with non-positive interval, so we can easily disable any task by setting the interval to an non-positive value
    2. Set the size of thread pool to be the same as number of tasks
    3. Reduce the run-time of PeriodicTaskSchedulerTest
---
 .../pinot/controller/ControllerStarter.java        |  16 +-
 .../controller/helix/SegmentStatusChecker.java     |  17 +-
 .../helix/core/minion/PinotTaskManager.java        |  16 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  64 ++++----
 .../core/relocation/RealtimeSegmentRelocator.java  |  13 +-
 .../helix/core/retention/RetentionManager.java     |  16 +-
 .../controller/validation/ValidationManager.java   |  16 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  68 +-------
 .../periodictask/ControllerPeriodicTaskTest.java   | 128 ++++++++-------
 .../pinot/core/periodictask/BasePeriodicTask.java  |  12 +-
 .../core/periodictask/PeriodicTaskScheduler.java   |  59 ++++---
 .../periodictask/PeriodicTaskSchedulerTest.java    | 181 +++++----------------
 12 files changed, 230 insertions(+), 376 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 69a4a4f..dd5041d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -38,9 +38,9 @@ import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrate
 import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
 import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
@@ -166,11 +166,9 @@ public class ControllerStarter {
 
     List<PeriodicTask> periodicTasks = new ArrayList<>();
 
+    LOGGER.info("Adding task manager to periodic task scheduler");
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
-    if (_taskManager.getIntervalInSeconds() > 0) {
-      LOGGER.info("Adding task manager to periodic task scheduler");
-      periodicTasks.add(_taskManager);
-    }
+    periodicTasks.add(_taskManager);
 
     LOGGER.info("Adding retention manager to periodic task scheduler");
     periodicTasks.add(_retentionManager);
@@ -187,12 +185,8 @@ public class ControllerStarter {
     _realtimeSegmentsManager.start(_controllerMetrics);
     PinotLLCRealtimeSegmentManager.getInstance().start();
 
-    if (_segmentStatusChecker.getIntervalInSeconds() == -1L) {
-      LOGGER.warn("Segment status check interval is -1, status checks disabled.");
-    } else {
-      LOGGER.info("Adding segment status checker to periodic task scheduler");
-      periodicTasks.add(_segmentStatusChecker);
-    }
+    LOGGER.info("Adding segment status checker to periodic task scheduler");
+    periodicTasks.add(_segmentStatusChecker);
 
     LOGGER.info("Adding realtime segment relocation manager to periodic task scheduler");
     periodicTasks.add(_realtimeSegmentRelocator);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 4e51bd6..549dc9d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -27,8 +27,6 @@ import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicT
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
@@ -69,7 +67,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
     _config = config;
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
     _metricsRegistry = metricsRegistry;
-    HttpConnectionManager httpConnectionManager = new MultiThreadedHttpConnectionManager();
   }
 
   @Override
@@ -85,15 +82,17 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    updateSegmentMetrics(allTableNames);
+  public void process(List<String> tables) {
+    updateSegmentMetrics(tables);
   }
 
   /**
-   * Runs a segment status pass over the currently loaded tables.
-   * @param allTableNames List of all the table names
+   * Runs a segment status pass over the given tables.
+   * TODO: revisit the logic and reduce the ZK access
+   *
+   * @param tables List of table names
    */
-  private void updateSegmentMetrics(List<String> allTableNames) {
+  private void updateSegmentMetrics(List<String> tables) {
     // Fetch the list of tables
     String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
     HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
@@ -112,7 +111,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
       logDisabledTables = false;
     }
 
-    for (String tableName : allTableNames) {
+    for (String tableName : tables) {
       try {
         if (TableNameBuilder.getTableTypeFromTableName(tableName) == TableType.OFFLINE) {
           offlineTableCount++;
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 460121e..0ff695d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -52,8 +52,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager,
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
-    super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
-        Math.min(60, controllerConf.getTaskManagerFrequencyInSeconds()), helixResourceManager);
+    super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
@@ -82,12 +81,13 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks.
-   * @param allTableNames List of all the table names
+   * Check the Pinot cluster status and schedule new tasks for the given tables.
+   *
+   * @param tables List of table names
    * @return Map from task type to task scheduled
    */
   @Nonnull
-  private Map<String, String> scheduleTasks(List<String> allTableNames) {
+  private Map<String, String> scheduleTasks(List<String> tables) {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
 
     Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
@@ -102,7 +102,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
     }
 
     // Scan all table configs to get the tables with tasks enabled
-    for (String tableName : allTableNames) {
+    for (String tableName : tables) {
       TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableName);
       if (tableConfig != null) {
         TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -155,7 +155,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    scheduleTasks(allTableNames);
+  public void process(List<String> tables) {
+    scheduleTasks(tables);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index f00e715..53da198 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,10 +15,10 @@
  */
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
+import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,21 +28,29 @@ import org.slf4j.LoggerFactory;
  * which table resources should be managed by this Pinot controller.
  */
 public abstract class ControllerPeriodicTask extends BasePeriodicTask {
-  public static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
+
+  public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
+  public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
-  private boolean _amILeader;
-  private static final int DEFAULT_INITIAL_DELAY_IN_SECOND = 120;
 
-  public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager) {
-    this(taskName, runFrequencyInSeconds, DEFAULT_INITIAL_DELAY_IN_SECOND, pinotHelixResourceManager);
-  }
+  private boolean _wasLeader = false;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    setAmILeader(false);
+  }
+
+  public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(), pinotHelixResourceManager);
+  }
+
+  private static long getRandomInitialDelayInSeconds() {
+    return MIN_INITIAL_DELAY_IN_SECONDS + new Random().nextInt(
+        MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
   }
 
   @Override
@@ -60,38 +68,29 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   }
 
   private void skipLeaderTask() {
-    if (getAmILeader()) {
+    if (_wasLeader) {
       LOGGER.info("Current pinot controller lost leadership.");
       onBecomeNotLeader();
+      _wasLeader = false;
     }
-    setAmILeader(false);
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", getTaskName());
+    LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
   }
 
-  private void processLeaderTask(List<String> allTableNames) {
-    if (!getAmILeader()) {
+  private void processLeaderTask(List<String> tables) {
+    if (!_wasLeader) {
       LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
-          getTaskName(), getIntervalInSeconds());
+          _taskName, _intervalInSeconds);
       onBecomeLeader();
+      _wasLeader = true;
     }
-    setAmILeader(true);
     long startTime = System.currentTimeMillis();
-    LOGGER.info("Starting to process {} tables in periodic task: {}", allTableNames.size(), getTaskName());
-    process(allTableNames);
-    LOGGER.info("Finished processing {} tables in periodic task: {} in {}ms", allTableNames.size(), getTaskName(),
+    int numTables = tables.size();
+    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
+    process(tables);
+    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
         (System.currentTimeMillis() - startTime));
   }
 
-  @VisibleForTesting
-  public boolean getAmILeader() {
-    return _amILeader;
-  }
-
-  @VisibleForTesting
-  public void setAmILeader(boolean amILeader) {
-    _amILeader = amILeader;
-  }
-
   /**
    * Does the following logic when losing the leadership. This should be done only once during leadership transition.
    */
@@ -105,8 +104,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   }
 
   /**
-   * Processes the periodic task as lead controller.
-   * @param allTableNames List of all the table names
+   * Processes the task on the given tables.
+   *
+   * @param tables List of table names
    */
-  public abstract void process(List<String> allTableNames);
+  public abstract void process(List<String> tables);
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 0954f5b..ebd5f37 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,18 +58,19 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    runRelocation(allTableNames);
+  public void process(List<String> tables) {
+    runRelocation(tables);
   }
 
   /**
-   * Check all tables. Perform relocation of segments if table is realtime and relocation is required
+   * Check the given tables. Perform relocation of segments if table is realtime and relocation is required
    * TODO: Model this to implement {@link com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} interface
    * https://github.com/linkedin/pinot/issues/2609
-   * @param allTableNames List of all the table names
+   *
+   * @param tables List of table names
    */
-  private void runRelocation(List<String> allTableNames) {
-    for (final String tableNameWithType : allTableNames) {
+  private void runRelocation(List<String> tables) {
+    for (final String tableNameWithType : tables) {
       // Only consider realtime tables.
       if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
         continue;
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 7423cef..d76cc3b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -51,8 +51,7 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
       int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, Math.min(60, runFrequencyInSeconds),
-        pinotHelixResourceManager);
+    super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
   }
 
@@ -63,17 +62,18 @@ public class RetentionManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    execute(allTableNames);
+  public void process(List<String> tables) {
+    execute(tables);
   }
 
   /**
-   * Manages retention for all tables.
-   * @param allTableNames List of all the table names
+   * Manages retention for the given tables.
+   *
+   * @param tables List of table names
    */
-  private void execute(List<String> allTableNames) {
+  private void execute(List<String> tables) {
     try {
-      for (String tableNameWithType : allTableNames) {
+      for (String tableNameWithType : tables) {
         LOGGER.info("Start managing retention for table: {}", tableNameWithType);
         manageRetentionForTable(tableNameWithType);
       }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index ce2b404..8717318 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -60,8 +60,7 @@ public class ValidationManager extends ControllerPeriodicTask {
 
   public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
-    super("ValidationManager", config.getValidationControllerFrequencyInSeconds(),
-        config.getValidationControllerFrequencyInSeconds() / 2, pinotHelixResourceManager);
+    super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
     _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
@@ -75,15 +74,16 @@ public class ValidationManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    runValidation(allTableNames);
+  public void process(List<String> tables) {
+    runValidation(tables);
   }
 
   /**
-   * Runs a validation pass over the currently loaded tables.
-   * @param allTableNames List of all the table names
+   * Runs a validation pass over the given tables.
+   *
+   * @param tables List of table names
    */
-  private void runValidation(List<String> allTableNames) {
+  private void runValidation(List<String> tables) {
     // Run segment level validation using a separate interval
     boolean runSegmentLevelValidation = false;
     long currentTimeMs = System.currentTimeMillis();
@@ -97,7 +97,7 @@ public class ValidationManager extends ControllerPeriodicTask {
     // Cache instance configs to reduce ZK access
     List<InstanceConfig> instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
 
-    for (String tableNameWithType : allTableNames) {
+    for (String tableNameWithType : tables) {
       try {
         TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
         if (tableConfig == null) {
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 16248db..de6eca9 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -33,13 +33,9 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class SegmentStatusCheckerTest {
@@ -49,18 +45,6 @@ public class SegmentStatusCheckerTest {
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
 
-  @BeforeSuite
-  public void setUp() throws Exception {
-  }
-
-  @AfterSuite
-  public void tearDown() {
-  }
-
-  @BeforeMethod
-  public void beforeMethod() {
-  }
-
   @Test
   public void offlineBasicTest() throws Exception {
     final String tableName = "myTable_OFFLINE";
@@ -104,7 +88,6 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -115,7 +98,6 @@ public class SegmentStatusCheckerTest {
         ControllerGauge.PERCENT_OF_REPLICAS), 33);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
   }
 
   @Test
@@ -172,7 +154,6 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -183,53 +164,6 @@ public class SegmentStatusCheckerTest {
         ControllerGauge.PERCENT_OF_REPLICAS), 100);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
-  }
-
-  @Test
-  public void nonLeaderTest() throws Exception {
-    final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-
-    HelixAdmin helixAdmin;
-    {
-      helixAdmin = mock(HelixAdmin.class);
-    }
-    {
-      helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(false);
-      when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
-      when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
-      when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
-    }
-    {
-      config = mock(ControllerConf.class);
-      when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    metricsRegistry = new MetricsRegistry();
-    controllerMetrics = new ControllerMetrics(metricsRegistry);
-
-    // From non-leader to non-leader.
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
-    segmentStatusChecker.init();
-    segmentStatusChecker.run();
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        Long.MIN_VALUE);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
-
-    // Leadership transition from leader to non-leader.
-    controllerMetrics.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0L);
-    segmentStatusChecker.setAmILeader(true);
-    segmentStatusChecker.run();
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        Long.MIN_VALUE);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
   }
 
   @Test
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 354456f..6a2c1b1 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -16,75 +16,91 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
-import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.Assert;
-import org.testng.annotations.BeforeTest;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
 
 
 public class ControllerPeriodicTaskTest {
-  private PinotHelixResourceManager helixResourceManager;
-  private AtomicInteger numOfProcessingMessages;
-
-  @BeforeTest
-  public void setUp() {
-    numOfProcessingMessages = new AtomicInteger(0);
-    helixResourceManager = mock(PinotHelixResourceManager.class);
-    List<String> allTableNames = new ArrayList<>();
-    allTableNames.add("testTable_REALTIME");
-    allTableNames.add("testTable_OFFLINE");
-    when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
+  private static final long RUN_FREQUENCY_IN_SECONDS = 30;
+
+  private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+  private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _processCalled = new AtomicBoolean();
+
+  private final ControllerPeriodicTask _task =
+      new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+        @Override
+        public void onBecomeLeader() {
+          _onBecomeLeaderCalled.set(true);
+        }
+
+        @Override
+        public void onBecomeNotLeader() {
+          _onBecomeNonLeaderCalled.set(true);
+        }
+
+        @Override
+        public void process(List<String> tables) {
+          _processCalled.set(true);
+        }
+      };
+
+  private void resetState() {
+    _onBecomeLeaderCalled.set(false);
+    _onBecomeNonLeaderCalled.set(false);
+    _processCalled.set(false);
   }
 
   @Test
-  public void testWhenControllerIsLeader() throws InterruptedException {
-    long totalRunTimeInMilliseconds = 3_500L;
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-    when(helixResourceManager.isLeader()).thenReturn(true);
-
-    PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, initialDelayInSeconds);
-
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    periodicTaskScheduler.start(Collections.singletonList(periodicTask));
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
-    Assert.assertEquals(totalRunTimeInMilliseconds / 1000L, numOfProcessingMessages.get());
+  public void testRandomInitialDelay() {
+    assertTrue(_task.getInitialDelayInSeconds() >= ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS);
+    assertTrue(_task.getInitialDelayInSeconds() < ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS);
+
+    assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
   }
 
   @Test
-  public void testWhenControllerIsNotLeader() throws InterruptedException {
-    long totalRunTimeInMilliseconds = 3_500L;
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-
-    when(helixResourceManager.isLeader()).thenReturn(false);
-    PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, initialDelayInSeconds);
-
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    periodicTaskScheduler.start(Collections.singletonList(periodicTask));
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
-    Assert.assertEquals(0, numOfProcessingMessages.get());
-  }
+  public void testChangeLeadership() {
+    // Initial state
+    resetState();
+    _task.init();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
+
+    // From non-leader to non-leader
+    resetState();
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
+
+    // From non-leader to leader
+    resetState();
+    when(_resourceManager.isLeader()).thenReturn(true);
+    _task.run();
+    assertTrue(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertTrue(_processCalled.get());
+
+    // From leader to leader
+    resetState();
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertTrue(_processCalled.get());
 
-  private PeriodicTask createMockPeriodicTask(long runFrequencyInSeconds, long initialDelayInSeconds) {
-    return new ControllerPeriodicTask("Task", runFrequencyInSeconds, initialDelayInSeconds, helixResourceManager) {
-      public void init() {
-        numOfProcessingMessages.set(0);
-      }
-
-      @Override
-      public void process(List<String> allTableNames) {
-        numOfProcessingMessages.incrementAndGet();
-      }
-    };
+    // From leader to non-leader
+    resetState();
+    when(_resourceManager.isLeader()).thenReturn(false);
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertTrue(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
   }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
index a1f4207..15ee475 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
@@ -19,9 +19,9 @@ package com.linkedin.pinot.core.periodictask;
  * A base class to implement periodic task interface.
  */
 public abstract class BasePeriodicTask implements PeriodicTask {
-  private final String _taskName;
-  private long _intervalInSeconds;
-  private long _initialDelayInSeconds;
+  protected final String _taskName;
+  protected final long _intervalInSeconds;
+  protected final long _initialDelayInSeconds;
 
   public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds) {
     _taskName = taskName;
@@ -43,4 +43,10 @@ public abstract class BasePeriodicTask implements PeriodicTask {
   public String getTaskName() {
     return _taskName;
   }
+
+  @Override
+  public String toString() {
+    return String.format("Task: %s, Interval: %ds, Initial Delay: %ds", _taskName, _intervalInSeconds,
+        _initialDelayInSeconds);
+  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index f58fffc..e5565de 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.core.periodictask;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -22,51 +23,55 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Periodic task scheduler will schedule a list of tasks based on their initial delay time and interval time.
  */
 public class PeriodicTaskScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
-  private static final int CORE_POOL_SIZE = 5;
-  private final ScheduledExecutorService _executorService;
 
-  public PeriodicTaskScheduler() {
-    LOGGER.info("Initializing PeriodicTaskScheduler.");
-    _executorService = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
-  }
+  private ScheduledExecutorService _executorService;
 
   /**
    * Start scheduling periodic tasks.
    */
   public void start(List<PeriodicTask> periodicTasks) {
-    if (periodicTasks == null || periodicTasks.isEmpty()) {
-      LOGGER.warn("No periodic task assigned to scheduler!");
-      return;
+    if (_executorService != null) {
+      LOGGER.warn("Periodic task scheduler already started");
     }
 
-    if (periodicTasks.size() > CORE_POOL_SIZE) {
-      LOGGER.warn("The number of tasks:{} is more than the default number of threads:{}.", periodicTasks.size(),
-          CORE_POOL_SIZE);
+    List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+    for (PeriodicTask periodicTask : periodicTasks) {
+      if (periodicTask.getIntervalInSeconds() > 0) {
+        tasksWithValidInterval.add(periodicTask);
+      }
     }
 
-    LOGGER.info("Starting PeriodicTaskScheduler.");
-    // Set up an executor that executes tasks periodically
-    for (PeriodicTask periodicTask : periodicTasks) {
-      periodicTask.init();
-      _executorService.scheduleWithFixedDelay(() -> {
-        try {
-          periodicTask.run();
-        } catch (Throwable e) {
-          // catch all errors to prevent subsequent executions from being silently suppressed
-          // Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
-          LOGGER.warn("Caught exception while running Task: {}", periodicTask.getTaskName(), e);
-        }
-      }, periodicTask.getInitialDelayInSeconds(), periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+    if (tasksWithValidInterval.isEmpty()) {
+      LOGGER.warn("No periodic task scheduled");
+    } else {
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
+      _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
+      for (PeriodicTask periodicTask : periodicTasks) {
+        periodicTask.init();
+        _executorService.scheduleWithFixedDelay(() -> {
+          try {
+            periodicTask.run();
+          } catch (Throwable e) {
+            // catch all errors to prevent subsequent executions from being silently suppressed
+            // Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+            LOGGER.warn("Caught exception while running Task: {}", periodicTask.getTaskName(), e);
+          }
+        }, periodicTask.getInitialDelayInSeconds(), periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+      }
     }
   }
 
   public void stop() {
-    LOGGER.info("Stopping PeriodicTaskScheduler");
-    _executorService.shutdown();
+    if (_executorService != null) {
+      LOGGER.info("Stopping periodic task scheduler");
+      _executorService.shutdown();
+      _executorService = null;
+    }
   }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a2ff21e..78aad1c 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -16,171 +16,70 @@
 package com.linkedin.pinot.core.periodictask;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
 
-public class PeriodicTaskSchedulerTest {
-
-  @Test
-  public void testSchedulerWithOneTask() throws InterruptedException {
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-    long totalRunTimeInMilliseconds = 3_500L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task = new BasePeriodicTask("Task", runFrequencyInSeconds, initialDelayInSeconds) {
-      @Override
-      public void init() {
-        count.set(0);
-      }
-
-      @Override
-      public void run() {
-        // Execute task.
-        count.incrementAndGet();
-      }
-    };
-    periodicTasks.add(task);
-
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
 
-    periodicTaskScheduler.stop();
-
-    Assert.assertTrue(count.get() > 0);
-    Assert.assertEquals(count.get(), (totalRunTimeInMilliseconds / (runFrequencyInSeconds * 1000)));
-    Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start));
-  }
+public class PeriodicTaskSchedulerTest {
 
   @Test
-  public void testSchedulerWithTwoStaggeredTasks() throws InterruptedException {
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 2L;
-    long totalRunTimeInMilliseconds = 1_500L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Execute task.
-        count.incrementAndGet();
-      }
-    };
-    periodicTasks.add(task1);
+  public void testTaskWithInvalidInterval() throws Exception {
+    AtomicBoolean initCalled = new AtomicBoolean();
+    AtomicBoolean runCalled = new AtomicBoolean();
 
-    // Stagger 2 tasks by delaying the 2nd task half of the frequency.
-    PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds, runFrequencyInSeconds / 2) {
+    List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L, 0L) {
       @Override
       public void init() {
+        initCalled.set(true);
       }
 
       @Override
       public void run() {
-        // Execute task.
-        count.decrementAndGet();
+        runCalled.set(true);
       }
-    };
-    periodicTasks.add(task2);
+    });
 
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.start(periodicTasks);
+    Thread.sleep(100L);
+    taskScheduler.stop();
 
-    Assert.assertEquals(count.get(), 0);
-    Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start));
+    assertFalse(initCalled.get());
+    assertFalse(runCalled.get());
   }
 
   @Test
-  public void testSchedulerWithTwoTasksDifferentFrequencies() throws InterruptedException {
-    long startTime = System.currentTimeMillis();
-    AtomicLong count = new AtomicLong(startTime);
-    AtomicLong count2 = new AtomicLong(startTime);
-    final long[] maxRunTimeForTask1 = {0L};
-    final long[] maxRunTimeForTask2 = {0L};
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 1L;
-    long totalRunTimeInMilliseconds = 10_000L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Calculate the max waiting time between the same task.
-        long lastTime = count.get();
-        long now = System.currentTimeMillis();
-        maxRunTimeForTask1[0] = Math.max(maxRunTimeForTask1[0], (now - lastTime));
-        count.set(now);
-      }
-    };
-    periodicTasks.add(task1);
-
-    // The time for Task 2 to run is 4 seconds, which is higher than the interval time of Task 1.
-    long TimeToRunMs = 4_000L;
-    PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds * 3, 0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Calculate the max waiting time between the same task.
-        long lastTime = count2.get();
-        long now = System.currentTimeMillis();
-        maxRunTimeForTask2[0] = Math.max(maxRunTimeForTask2[0], (now - lastTime));
-        count2.set(now);
-        try {
-          Thread.sleep(TimeToRunMs);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
+  public void testScheduleMultipleTasks() throws Exception {
+    int numTasks = 3;
+    AtomicInteger numTimesInitCalled = new AtomicInteger();
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) {
+        @Override
+        public void init() {
+          numTimesInitCalled.getAndIncrement();
         }
-      }
-    };
-    periodicTasks.add(task2);
-
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
-
-    periodicTaskScheduler.stop();
 
-    Assert.assertTrue(count.get() > startTime);
-    Assert.assertTrue(count2.get() > startTime);
-    // Task1 didn't waited until Task2 finished.
-    Assert.assertTrue(maxRunTimeForTask1[0] - task1.getIntervalInSeconds() * 1000L < 100L);
-    Assert.assertTrue(maxRunTimeForTask2[0] >= Math.max(task2.getIntervalInSeconds() * 1000L, TimeToRunMs));
-  }
-
-  @Test
-  public void testNoTaskAssignedToQueue() throws InterruptedException {
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long totalRunTimeInMilliseconds = 2_000L;
-
-    // An empty list.
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
+        @Override
+        public void run() {
+          numTimesRunCalled.getAndIncrement();
+        }
+      });
+    }
 
-    periodicTaskScheduler.stop();
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.start(periodicTasks);
+    Thread.sleep(1100L);
+    taskScheduler.stop();
 
-    Assert.assertEquals(count.get(), 0);
-    Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start));
+    assertEquals(numTimesInitCalled.get(), numTasks);
+    assertEquals(numTimesRunCalled.get(), numTasks * 2);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org