You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/21 23:30:10 UTC

[incubator-pinot] branch master updated: Start and stop ControllerPeriodicTasks based on leadership changes (#3622)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09eb015  Start and stop ControllerPeriodicTasks based on leadership changes (#3622)
09eb015 is described below

commit 09eb0150dec47a28d5a4517e4930183eb5dfd0af
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Dec 21 15:30:04 2018 -0800

    Start and stop ControllerPeriodicTasks based on leadership changes (#3622)
    
    Introducing a ControllerPeriodicTasksScheduler and making it a subscriber of LeadershipChangeSubscriber. This will enable the scheduler to start the periodic tasks on a controller whenever it becomes leader, as well as stop the tasks on a controller who lost leadership.
---
 .../pinot/controller/ControllerStarter.java        |  15 +--
 .../controller/helix/SegmentStatusChecker.java     |  14 +--
 .../helix/core/minion/PinotTaskManager.java        |  30 ++---
 .../core/periodictask/ControllerPeriodicTask.java  | 127 +++++++++++++--------
 .../ControllerPeriodicTaskScheduler.java           |  57 +++++++++
 .../core/relocation/RealtimeSegmentRelocator.java  |  10 ++
 .../helix/core/retention/RetentionManager.java     |  14 ++-
 .../controller/validation/ValidationManager.java   |  17 ++-
 .../controller/helix/SegmentStatusCheckerTest.java |  33 ++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  96 +++++++++-------
 .../helix/core/retention/RetentionManagerTest.java |  16 +--
 .../pinot/core/periodictask/PeriodicTask.java      |   5 +
 .../core/periodictask/PeriodicTaskScheduler.java   |  45 +++++---
 .../periodictask/PeriodicTaskSchedulerTest.java    |  20 +++-
 14 files changed, 324 insertions(+), 175 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 33fb5ce..1e8552a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
@@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
 import com.linkedin.pinot.controller.validation.ValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
@@ -80,7 +80,7 @@ public class ControllerStarter {
   private final PinotRealtimeSegmentManager _realtimeSegmentsManager;
   private final SegmentStatusChecker _segmentStatusChecker;
   private final ExecutorService _executorService;
-  private final PeriodicTaskScheduler _periodicTaskScheduler;
+  private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
   private ValidationManager _validationManager;
@@ -101,7 +101,7 @@ public class ControllerStarter {
         Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
     _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
     _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
-    _periodicTaskScheduler = new PeriodicTaskScheduler();
+    _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
@@ -173,6 +173,7 @@ public class ControllerStarter {
     _realtimeSegmentsManager.start(_controllerMetrics);
 
     // Setting up periodic tasks
+    LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
@@ -183,7 +184,10 @@ public class ControllerStarter {
     periodicTasks.add(_validationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
-    _periodicTaskScheduler.start(periodicTasks);
+
+    LOGGER.info("Init controller periodic tasks scheduler");
+    _controllerPeriodicTaskScheduler.init(periodicTasks);
+
 
     LOGGER.info("Creating rebalance segments factory");
     RebalanceSegmentStrategyFactory.createInstance(helixManager);
@@ -309,9 +313,6 @@ public class ControllerStarter {
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
-      LOGGER.info("Stopping periodic task scheduler");
-      _periodicTaskScheduler.stop();
-
       _executorService.shutdownNow();
 
     } catch (final Exception e) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 8059d9e..d8fdbb4 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -78,18 +78,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
-  public void init() {
+  public void initTask() {
     LOGGER.info("Initializing table metrics for all the tables.");
     setStatusToDefault();
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Resetting table metrics for all the tables.");
-    setStatusToDefault();
-  }
-
-  @Override
   protected void preprocess() {
     _realTimeTableCount = 0;
     _offlineTableCount = 0;
@@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
     _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
   }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Resetting table metrics for all the tables.");
+    setStatusToDefault();
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 729c75c..ff268a6 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -64,6 +64,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
     _controllerMetrics = controllerMetrics;
   }
 
+  @Override
+  protected void initTask() {
+
+  }
+
   /**
    * Get the cluster info provider.
    * <p>Cluster info provider might be needed to initialize task generators.
@@ -93,19 +98,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
     return getTasksScheduled();
   }
 
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
-   */
-  @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
-
   @Override
   protected void preprocess() {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
@@ -163,4 +155,16 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   private Map<String, String> getTasksScheduled() {
     return _tasksScheduled;
   }
+
+  /**
+   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+   */
+  @Override
+  public void stopTask() {
+    LOGGER.info("Perform task cleanups.");
+    // Performs necessary cleanups for each task type.
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index c416d0e..766b573 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -16,7 +16,6 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
   public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
 
+  private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
 
-  private boolean _isLeader = false;
+  private volatile boolean _stopPeriodicTask;
+  private volatile boolean _periodicTaskInProgress;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager) {
@@ -55,67 +57,90 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
     return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
   }
 
+  /**
+   * Reset flags, and call initTask which initializes each individual task
+   */
   @Override
-  public void init() {
+  public final void init() {
+    _stopPeriodicTask = false;
+    _periodicTaskInProgress = false;
+    initTask();
   }
 
+  /**
+   * Execute the ControllerPeriodicTask.
+   * The _periodicTaskInProgress is enabled at the beginning and disabled before exiting,
+   * to ensure that we can wait for a task in progress to finish when stop has been invoked
+   */
   @Override
-  public void run() {
-    if (!isLeader()) {
-      skipLeaderTask();
-    } else {
-      List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
-      processLeaderTask(allTableNames);
-    }
-  }
-
-  private void skipLeaderTask() {
-    if (_isLeader) {
-      LOGGER.info("Current pinot controller lost leadership.");
-      _isLeader = false;
-      onBecomeNotLeader();
-    }
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
-  }
+  public final void run() {
+    _stopPeriodicTask = false;
+    _periodicTaskInProgress = true;
 
-  private void processLeaderTask(List<String> tables) {
-    if (!_isLeader) {
-      LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
-          _taskName, _intervalInSeconds);
-      _isLeader = true;
-      onBecomeLeader();
-    }
+    List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables();
     long startTime = System.currentTimeMillis();
-    int numTables = tables.size();
-    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
-    process(tables);
-    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
+    int numTables = tableNamesWithType.size();
+
+    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, getTaskName());
+    process(tableNamesWithType);
+    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, getTaskName(),
         (System.currentTimeMillis() - startTime));
-  }
 
-  /**
-   * Does the following logic when losing the leadership. This should be done only once during leadership transition.
-   */
-  public void onBecomeNotLeader() {
+    _periodicTaskInProgress = false;
   }
 
   /**
-   * Does the following logic when becoming lead controller. This should be done only once during leadership transition.
+   * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag. The flag ensures that processing of no new table begins.
+   * This method waits for the in progress ControllerPeriodicTask to finish the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
+   * Finally, it invokes the stopTask for any specific cleanup at the individual task level
    */
-  public void onBecomeLeader() {
+  @Override
+  public final void stop() {
+    _stopPeriodicTask = true;
+
+    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", getTaskName(),
+        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+    while (_periodicTaskInProgress && millisToWait > 0) {
+      try {
+        long thisWait = 1000;
+        if (millisToWait < thisWait) {
+          thisWait = millisToWait;
+        }
+        Thread.sleep(thisWait);
+        millisToWait -= thisWait;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task {}", millisToWait,
+            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, getTaskName());
+        break;
+      }
+    }
+    LOGGER.info("Wait completed for task {}. Waited for {} ms. _periodicTaskInProgress = {}", getTaskName(),
+        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait, _periodicTaskInProgress);
+
+    stopTask();
   }
 
   /**
    * Processes the task on the given tables.
    *
-   * @param tables List of table names
+   * @param tableNamesWithType List of table names
    */
-  protected void process(List<String> tables) {
-    preprocess();
-    for (String table : tables) {
-      processTable(table);
+  protected void process(List<String> tableNamesWithType) {
+    if (!shouldStopPeriodicTask()) {
+      preprocess();
+      for (String tableNameWithType : tableNamesWithType) {
+        if (shouldStopPeriodicTask()) {
+          LOGGER.info("Skip processing table {} and all the remaining tables for task {}.", tableNameWithType,
+              getTaskName());
+          break;
+        }
+        processTable(tableNameWithType);
+      }
+      postprocess();
+    } else {
+      LOGGER.info("Skip processing all tables for task {}", getTaskName());
     }
-    postprocess();
   }
 
   /**
@@ -135,7 +160,17 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   protected abstract void postprocess();
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+  protected boolean shouldStopPeriodicTask() {
+    return _stopPeriodicTask;
   }
+
+  /**
+   * Initialize the ControllerPeriodicTask, to be defined by each individual task
+   */
+  protected abstract void initTask();
+
+  /**
+   * Perform cleanup for the ControllerPeriodicTask, to be defined by each individual task
+   */
+  protected abstract void stopTask();
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
new file mode 100644
index 0000000..1452861
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller.helix.core.periodictask;
+
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.core.periodictask.PeriodicTask;
+import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} according to controller leadership changes.
+ * Any controllerPeriodicTasks provided during initialization, will run only on leadership, and stop when leadership lost
+ */
+public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTaskScheduler.class);
+
+  /**
+   * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
+   * This is called only once during controller startup
+   * @param controllerPeriodicTasks
+   */
+  public void init(List<PeriodicTask> controllerPeriodicTasks) {
+    super.init(controllerPeriodicTasks);
+    ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+  }
+
+  @Override
+  public void onBecomingLeader() {
+    LOGGER.info("Received callback for controller leadership gain. Starting PeriodicTaskScheduler.");
+    start();
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    LOGGER.info("Received callback for controller leadership loss. Stopping PeriodicTaskScheduler.");
+    stop();
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index e324c11..bfede7b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,6 +58,11 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   @Override
+  protected void initTask() {
+
+  }
+
+  @Override
   protected void preprocess() {
 
   }
@@ -269,4 +274,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
     }
     return seconds;
   }
+
+  @Override
+  public void stopTask() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 80894fc..134c4ee 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -53,15 +53,17 @@ public class RetentionManager extends ControllerPeriodicTask {
       int deletedSegmentsRetentionInDays) {
     super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
-  }
 
-  @Override
-  public void onBecomeLeader() {
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
   }
 
   @Override
+  protected void initTask() {
+
+  }
+
+  @Override
   protected void preprocess() {
 
   }
@@ -185,4 +187,10 @@ public class RetentionManager extends ControllerPeriodicTask {
           CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
     }
   }
+
+
+  @Override
+  public void stopTask() {
+
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 18025eb..7d06289 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Unregister all the validation metrics.");
-    _validationMetrics.unregisterAllMetrics();
-  }
-
-  @Override
   protected void preprocess() {
     // Run segment level validation using a separate interval
     _runSegmentLevelValidation = false;
@@ -312,4 +306,15 @@ public class ValidationManager extends ControllerPeriodicTask {
 
     return numTotalDocs;
   }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 4d4c324..1246991 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
-
-  private class MockSegmentStatusChecker extends SegmentStatusChecker {
-
-    public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics metricsRegistry) {
-      super(pinotHelixResourceManager, config, metricsRegistry);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 94c9f52..cce2f71 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -16,8 +16,12 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
@@ -28,33 +32,50 @@ public class ControllerPeriodicTaskTest {
   private static final long RUN_FREQUENCY_IN_SECONDS = 30;
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
-  private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
-  private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
+  private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
+  private final AtomicInteger _numTablesProcessed = new AtomicInteger();
+  private final int _numTables = 3;
 
   private final MockControllerPeriodicTask _task =
       new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+
         @Override
-        public void onBecomeLeader() {
-          _onBecomeLeaderCalled.set(true);
+        protected void initTask() {
+          _initTaskCalled.set(true);
         }
 
         @Override
-        public void onBecomeNotLeader() {
-          _onBecomeNonLeaderCalled.set(true);
+        public void stopTask() {
+          _stopTaskCalled.set(true);
         }
 
         @Override
-        public void process(List<String> tables) {
+        public void process(List<String> tableNamesWithType) {
           _processCalled.set(true);
+          super.process(tableNamesWithType);
+        }
+
+        @Override
+        public void processTable(String tableNameWithType) {
+          _numTablesProcessed.getAndIncrement();
         }
 
       };
 
+  @BeforeTest
+  public void beforeTest() {
+    List<String> tables = new ArrayList<>(_numTables);
+    IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
+    when(_resourceManager.getAllTables()).thenReturn(tables);
+  }
+
   private void resetState() {
-    _onBecomeLeaderCalled.set(false);
-    _onBecomeNonLeaderCalled.set(false);
+    _initTaskCalled.set(false);
+    _stopTaskCalled.set(false);
     _processCalled.set(false);
+    _numTablesProcessed.set(0);
   }
 
   @Test
@@ -66,56 +87,54 @@ public class ControllerPeriodicTaskTest {
   }
 
   @Test
-  public void testChangeLeadership() {
+  public void testControllerPeriodicTaskCalls() {
     // Initial state
     resetState();
-    _task.setLeader(false);
     _task.init();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertTrue(_initTaskCalled.get());
     assertFalse(_processCalled.get());
+    assertEquals(_numTablesProcessed.get(), 0);
+    assertFalse(_stopTaskCalled.get());
+    assertFalse(_task.shouldStopPeriodicTask());
 
-    // From non-leader to non-leader
+    // run task - leadership gained
     resetState();
     _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
-    assertFalse(_processCalled.get());
+    assertFalse(_initTaskCalled.get());
+    assertTrue(_processCalled.get());
+    assertEquals(_numTablesProcessed.get(), _numTables);
+    assertFalse(_stopTaskCalled.get());
+    assertFalse(_task.shouldStopPeriodicTask());
 
-    // From non-leader to leader
+    // stop periodic task - leadership lost
     resetState();
-    _task.setLeader(true);
-    _task.run();
-    assertTrue(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
-    assertTrue(_processCalled.get());
+    _task.stop();
+    assertFalse(_initTaskCalled.get());
+    assertFalse(_processCalled.get());
+    assertEquals(_numTablesProcessed.get(), 0);
+    assertTrue(_stopTaskCalled.get());
+    assertTrue(_task.shouldStopPeriodicTask());
 
-    // From leader to leader
+    // call to run after periodic task stop invoked - leadership gained back on same controller
     resetState();
     _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_task.shouldStopPeriodicTask());
+    assertFalse(_initTaskCalled.get());
     assertTrue(_processCalled.get());
+    assertEquals(_numTablesProcessed.get(), _numTables);
+    assertFalse(_stopTaskCalled.get());
 
-    // From leader to non-leader
-    resetState();
-    _task.setLeader(false);
-    _task.run();
-    assertFalse(_onBecomeLeaderCalled.get());
-    assertTrue(_onBecomeNonLeaderCalled.get());
-    assertFalse(_processCalled.get());
   }
 
   private class MockControllerPeriodicTask extends ControllerPeriodicTask {
 
-    private boolean _isLeader = true;
     public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
         PinotHelixResourceManager pinotHelixResourceManager) {
       super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
     }
 
     @Override
-    protected void process(List<String> tables) {
+    protected void initTask() {
 
     }
 
@@ -134,13 +153,10 @@ public class ControllerPeriodicTaskTest {
 
     }
 
+
     @Override
-    protected boolean isLeader() {
-      return _isLeader;
-    }
+    public void stopTask() {
 
-    void setLeader(boolean isLeader) {
-      _isLeader = isLeader;
     }
   }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 8e25505..c27b0ef 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -201,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,16 +306,4 @@ public class RetentionManagerTest {
     return segmentMetadata;
   }
 
-  private class MockRetentionManager extends RetentionManager {
-
-    public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-        int deletedSegmentsRetentionInDays) {
-      super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
-    }
-
-    @Override
-    protected boolean isLeader() {
-      return true;
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
index fac0750..f5b9c60 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
@@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable {
    * @return task name.
    */
   String getTaskName();
+
+  /**
+   * Stop the periodic task
+   */
+  void stop();
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index cd07ea4..6397ad4 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -31,34 +31,43 @@ public class PeriodicTaskScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
 
   private ScheduledExecutorService _executorService;
+  private List<PeriodicTask> _tasksWithValidInterval;
 
   /**
-   * Start scheduling periodic tasks.
+   * Initialize the PeriodicTaskScheduler with list of PeriodicTasks
+   * @param periodicTasks
    */
-  public void start(List<PeriodicTask> periodicTasks) {
-    if (_executorService != null) {
-      LOGGER.warn("Periodic task scheduler already started");
-    }
-
-    List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+  public void init(List<PeriodicTask> periodicTasks) {
+    _tasksWithValidInterval = new ArrayList<>();
     for (PeriodicTask periodicTask : periodicTasks) {
       if (periodicTask.getIntervalInSeconds() > 0) {
         LOGGER.info("Adding periodic task: {}", periodicTask);
-        tasksWithValidInterval.add(periodicTask);
+        _tasksWithValidInterval.add(periodicTask);
+        periodicTask.init();
       } else {
         LOGGER.info("Skipping periodic task: {}", periodicTask);
       }
     }
+  }
 
-    if (tasksWithValidInterval.isEmpty()) {
+  /**
+   * Start scheduling periodic tasks.
+   */
+  public synchronized void start() {
+    if (_executorService != null) {
+      LOGGER.warn("Periodic task scheduler already started");
+    }
+
+    if (_tasksWithValidInterval.isEmpty()) {
       LOGGER.warn("No periodic task scheduled");
     } else {
-      LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
-      _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
-      for (PeriodicTask periodicTask : tasksWithValidInterval) {
-        periodicTask.init();
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval);
+      _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
+      for (PeriodicTask periodicTask : _tasksWithValidInterval) {
         _executorService.scheduleWithFixedDelay(() -> {
           try {
+            LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(),
+                periodicTask.getIntervalInSeconds());
             periodicTask.run();
           } catch (Throwable e) {
             // catch all errors to prevent subsequent executions from being silently suppressed
@@ -70,11 +79,19 @@ public class PeriodicTaskScheduler {
     }
   }
 
-  public void stop() {
+  /**
+   * Shutdown executor service and stop the periodic tasks
+   */
+  public synchronized void stop() {
     if (_executorService != null) {
       LOGGER.info("Stopping periodic task scheduler");
       _executorService.shutdown();
       _executorService = null;
     }
+
+    if (_tasksWithValidInterval != null) {
+      LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
+      _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+    }
   }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a105435..fb3d2ad 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest {
   public void testTaskWithInvalidInterval() throws Exception {
     AtomicBoolean initCalled = new AtomicBoolean();
     AtomicBoolean runCalled = new AtomicBoolean();
+    AtomicBoolean stopCalled = new AtomicBoolean();
 
     List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
       @Override
@@ -39,18 +40,25 @@ public class PeriodicTaskSchedulerTest {
       }
 
       @Override
+      public void stop() {
+        stopCalled.set(true);
+      }
+
+      @Override
       public void run() {
         runCalled.set(true);
       }
     });
 
     PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
-    taskScheduler.start(periodicTasks);
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
     Thread.sleep(100L);
     taskScheduler.stop();
 
     assertFalse(initCalled.get());
     assertFalse(runCalled.get());
+    assertFalse(stopCalled.get());
   }
 
   @Test
@@ -58,6 +66,7 @@ public class PeriodicTaskSchedulerTest {
     int numTasks = 3;
     AtomicInteger numTimesInitCalled = new AtomicInteger();
     AtomicInteger numTimesRunCalled = new AtomicInteger();
+    AtomicInteger numTimesStopCalled = new AtomicInteger();
 
     List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
     for (int i = 0; i < numTasks; i++) {
@@ -68,6 +77,11 @@ public class PeriodicTaskSchedulerTest {
         }
 
         @Override
+        public void stop() {
+          numTimesStopCalled.getAndIncrement();
+        }
+
+        @Override
         public void run() {
           numTimesRunCalled.getAndIncrement();
         }
@@ -75,11 +89,13 @@ public class PeriodicTaskSchedulerTest {
     }
 
     PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
-    taskScheduler.start(periodicTasks);
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
     Thread.sleep(1100L);
     taskScheduler.stop();
 
     assertEquals(numTimesInitCalled.get(), numTasks);
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
+    assertEquals(numTimesStopCalled.get(), numTasks);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org