You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/21 23:30:10 UTC
[incubator-pinot] branch master updated: Start and stop
ControllerPeriodicTasks based on leadership changes (#3622)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 09eb015 Start and stop ControllerPeriodicTasks based on leadership changes (#3622)
09eb015 is described below
commit 09eb0150dec47a28d5a4517e4930183eb5dfd0af
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Dec 21 15:30:04 2018 -0800
Start and stop ControllerPeriodicTasks based on leadership changes (#3622)
Introducing a ControllerPeriodicTasksScheduler and making it a subscriber of LeadershipChangeSubscriber. This will enable the scheduler to start the periodic tasks on a controller whenever it becomes leader, as well as stop the tasks on a controller who lost leadership.
---
.../pinot/controller/ControllerStarter.java | 15 +--
.../controller/helix/SegmentStatusChecker.java | 14 +--
.../helix/core/minion/PinotTaskManager.java | 30 ++---
.../core/periodictask/ControllerPeriodicTask.java | 127 +++++++++++++--------
.../ControllerPeriodicTaskScheduler.java | 57 +++++++++
.../core/relocation/RealtimeSegmentRelocator.java | 10 ++
.../helix/core/retention/RetentionManager.java | 14 ++-
.../controller/validation/ValidationManager.java | 17 ++-
.../controller/helix/SegmentStatusCheckerTest.java | 33 ++----
.../periodictask/ControllerPeriodicTaskTest.java | 96 +++++++++-------
.../helix/core/retention/RetentionManagerTest.java | 16 +--
.../pinot/core/periodictask/PeriodicTask.java | 5 +
.../core/periodictask/PeriodicTaskScheduler.java | 45 +++++---
.../periodictask/PeriodicTaskSchedulerTest.java | 20 +++-
14 files changed, 324 insertions(+), 175 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 33fb5ce..1e8552a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
@@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
import com.linkedin.pinot.controller.validation.ValidationManager;
import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
import com.linkedin.pinot.filesystem.PinotFSFactory;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
@@ -80,7 +80,7 @@ public class ControllerStarter {
private final PinotRealtimeSegmentManager _realtimeSegmentsManager;
private final SegmentStatusChecker _segmentStatusChecker;
private final ExecutorService _executorService;
- private final PeriodicTaskScheduler _periodicTaskScheduler;
+ private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
// Can only be constructed after resource manager getting started
private ValidationManager _validationManager;
@@ -101,7 +101,7 @@ public class ControllerStarter {
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
_segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
_realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
- _periodicTaskScheduler = new PeriodicTaskScheduler();
+ _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
}
public PinotHelixResourceManager getHelixResourceManager() {
@@ -173,6 +173,7 @@ public class ControllerStarter {
_realtimeSegmentsManager.start(_controllerMetrics);
// Setting up periodic tasks
+ LOGGER.info("Setting up periodic tasks");
List<PeriodicTask> periodicTasks = new ArrayList<>();
_taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_taskManager);
@@ -183,7 +184,10 @@ public class ControllerStarter {
periodicTasks.add(_validationManager);
periodicTasks.add(_segmentStatusChecker);
periodicTasks.add(_realtimeSegmentRelocator);
- _periodicTaskScheduler.start(periodicTasks);
+
+ LOGGER.info("Init controller periodic tasks scheduler");
+ _controllerPeriodicTaskScheduler.init(periodicTasks);
+
LOGGER.info("Creating rebalance segments factory");
RebalanceSegmentStrategyFactory.createInstance(helixManager);
@@ -309,9 +313,6 @@ public class ControllerStarter {
LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();
- LOGGER.info("Stopping periodic task scheduler");
- _periodicTaskScheduler.stop();
-
_executorService.shutdownNow();
} catch (final Exception e) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 8059d9e..d8fdbb4 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -78,18 +78,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
}
@Override
- public void init() {
+ public void initTask() {
LOGGER.info("Initializing table metrics for all the tables.");
setStatusToDefault();
}
@Override
- public void onBecomeNotLeader() {
- LOGGER.info("Resetting table metrics for all the tables.");
- setStatusToDefault();
- }
-
- @Override
protected void preprocess() {
_realTimeTableCount = 0;
_offlineTableCount = 0;
@@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
_metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
_metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
}
+
+ @Override
+ public void stopTask() {
+ LOGGER.info("Resetting table metrics for all the tables.");
+ setStatusToDefault();
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 729c75c..ff268a6 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -64,6 +64,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
_controllerMetrics = controllerMetrics;
}
+ @Override
+ protected void initTask() {
+
+ }
+
/**
* Get the cluster info provider.
* <p>Cluster info provider might be needed to initialize task generators.
@@ -93,19 +98,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
return getTasksScheduled();
}
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
- */
- @Override
- public void onBecomeNotLeader() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
- for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
- _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
- }
- }
-
-
@Override
protected void preprocess() {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
@@ -163,4 +155,16 @@ public class PinotTaskManager extends ControllerPeriodicTask {
private Map<String, String> getTasksScheduled() {
return _tasksScheduled;
}
+
+ /**
+ * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+ */
+ @Override
+ public void stopTask() {
+ LOGGER.info("Perform task cleanups.");
+ // Performs necessary cleanups for each task type.
+ for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+ _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index c416d0e..766b573 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -16,7 +16,6 @@
package com.linkedin.pinot.controller.helix.core.periodictask;
import com.google.common.annotations.VisibleForTesting;
-import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
import java.util.List;
@@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+ private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
protected final PinotHelixResourceManager _pinotHelixResourceManager;
- private boolean _isLeader = false;
+ private volatile boolean _stopPeriodicTask;
+ private volatile boolean _periodicTaskInProgress;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
@@ -55,67 +57,90 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
}
+ /**
+ * Reset flags, and call initTask which initializes each individual task
+ */
@Override
- public void init() {
+ public final void init() {
+ _stopPeriodicTask = false;
+ _periodicTaskInProgress = false;
+ initTask();
}
+ /**
+ * Execute the ControllerPeriodicTask.
+ * The _periodicTaskInProgress is enabled at the beginning and disabled before exiting,
+ * to ensure that we can wait for a task in progress to finish when stop has been invoked
+ */
@Override
- public void run() {
- if (!isLeader()) {
- skipLeaderTask();
- } else {
- List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
- processLeaderTask(allTableNames);
- }
- }
-
- private void skipLeaderTask() {
- if (_isLeader) {
- LOGGER.info("Current pinot controller lost leadership.");
- _isLeader = false;
- onBecomeNotLeader();
- }
- LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
- }
+ public final void run() {
+ _stopPeriodicTask = false;
+ _periodicTaskInProgress = true;
- private void processLeaderTask(List<String> tables) {
- if (!_isLeader) {
- LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
- _taskName, _intervalInSeconds);
- _isLeader = true;
- onBecomeLeader();
- }
+ List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables();
long startTime = System.currentTimeMillis();
- int numTables = tables.size();
- LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
- process(tables);
- LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
+ int numTables = tableNamesWithType.size();
+
+ LOGGER.info("Start processing {} tables in periodic task: {}", numTables, getTaskName());
+ process(tableNamesWithType);
+ LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, getTaskName(),
(System.currentTimeMillis() - startTime));
- }
- /**
- * Does the following logic when losing the leadership. This should be done only once during leadership transition.
- */
- public void onBecomeNotLeader() {
+ _periodicTaskInProgress = false;
}
/**
- * Does the following logic when becoming lead controller. This should be done only once during leadership transition.
+ * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag. The flag ensures that processing of no new table begins.
+ * This method waits for the in progress ControllerPeriodicTask to finish the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
+ * Finally, it invokes the stopTask for any specific cleanup at the individual task level
*/
- public void onBecomeLeader() {
+ @Override
+ public final void stop() {
+ _stopPeriodicTask = true;
+
+ LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", getTaskName(),
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+ long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+ while (_periodicTaskInProgress && millisToWait > 0) {
+ try {
+ long thisWait = 1000;
+ if (millisToWait < thisWait) {
+ thisWait = millisToWait;
+ }
+ Thread.sleep(thisWait);
+ millisToWait -= thisWait;
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task {}", millisToWait,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, getTaskName());
+ break;
+ }
+ }
+ LOGGER.info("Wait completed for task {}. Waited for {} ms. _periodicTaskInProgress = {}", getTaskName(),
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait, _periodicTaskInProgress);
+
+ stopTask();
}
/**
* Processes the task on the given tables.
*
- * @param tables List of table names
+ * @param tableNamesWithType List of table names
*/
- protected void process(List<String> tables) {
- preprocess();
- for (String table : tables) {
- processTable(table);
+ protected void process(List<String> tableNamesWithType) {
+ if (!shouldStopPeriodicTask()) {
+ preprocess();
+ for (String tableNameWithType : tableNamesWithType) {
+ if (shouldStopPeriodicTask()) {
+ LOGGER.info("Skip processing table {} and all the remaining tables for task {}.", tableNameWithType,
+ getTaskName());
+ break;
+ }
+ processTable(tableNameWithType);
+ }
+ postprocess();
+ } else {
+ LOGGER.info("Skip processing all tables for task {}", getTaskName());
}
- postprocess();
}
/**
@@ -135,7 +160,17 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
protected abstract void postprocess();
@VisibleForTesting
- protected boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ protected boolean shouldStopPeriodicTask() {
+ return _stopPeriodicTask;
}
+
+ /**
+ * Initialize the ControllerPeriodicTask, to be defined by each individual task
+ */
+ protected abstract void initTask();
+
+ /**
+ * Perform cleanup for the ControllerPeriodicTask, to be defined by each individual task
+ */
+ protected abstract void stopTask();
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
new file mode 100644
index 0000000..1452861
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller.helix.core.periodictask;
+
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.core.periodictask.PeriodicTask;
+import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} according to controller leadership changes.
+ * Any controllerPeriodicTasks provided during initialization, will run only on leadership, and stop when leadership lost
+ */
+public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTaskScheduler.class);
+
+ /**
+ * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
+ * This is called only once during controller startup
+ * @param controllerPeriodicTasks
+ */
+ public void init(List<PeriodicTask> controllerPeriodicTasks) {
+ super.init(controllerPeriodicTasks);
+ ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+ }
+
+ @Override
+ public void onBecomingLeader() {
+ LOGGER.info("Received callback for controller leadership gain. Starting PeriodicTaskScheduler.");
+ start();
+ }
+
+ @Override
+ public void onBecomingNonLeader() {
+ LOGGER.info("Received callback for controller leadership loss. Stopping PeriodicTaskScheduler.");
+ stop();
+ }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index e324c11..bfede7b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,6 +58,11 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
@Override
+ protected void initTask() {
+
+ }
+
+ @Override
protected void preprocess() {
}
@@ -269,4 +274,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
return seconds;
}
+
+ @Override
+ public void stopTask() {
+
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 80894fc..134c4ee 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -53,15 +53,17 @@ public class RetentionManager extends ControllerPeriodicTask {
int deletedSegmentsRetentionInDays) {
super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
- }
- @Override
- public void onBecomeLeader() {
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
}
@Override
+ protected void initTask() {
+
+ }
+
+ @Override
protected void preprocess() {
}
@@ -185,4 +187,10 @@ public class RetentionManager extends ControllerPeriodicTask {
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
}
}
+
+
+ @Override
+ public void stopTask() {
+
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 18025eb..7d06289 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask {
}
@Override
- public void onBecomeNotLeader() {
- LOGGER.info("Unregister all the validation metrics.");
- _validationMetrics.unregisterAllMetrics();
- }
-
- @Override
protected void preprocess() {
// Run segment level validation using a separate interval
_runSegmentLevelValidation = false;
@@ -312,4 +306,15 @@ public class ValidationManager extends ControllerPeriodicTask {
return numTotalDocs;
}
+
+ @Override
+ protected void initTask() {
+
+ }
+
+ @Override
+ public void stopTask() {
+ LOGGER.info("Unregister all the validation metrics.");
+ _validationMetrics.unregisterAllMetrics();
+ }
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 4d4c324..1246991 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest {
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
}
-
- private class MockSegmentStatusChecker extends SegmentStatusChecker {
-
- public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics metricsRegistry) {
- super(pinotHelixResourceManager, config, metricsRegistry);
- }
-
- @Override
- protected boolean isLeader() {
- return true;
- }
- }
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 94c9f52..cce2f71 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -16,8 +16,12 @@
package com.linkedin.pinot.controller.helix.core.periodictask;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
@@ -28,33 +32,50 @@ public class ControllerPeriodicTaskTest {
private static final long RUN_FREQUENCY_IN_SECONDS = 30;
private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
- private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
- private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+ private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
+ private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
private final AtomicBoolean _processCalled = new AtomicBoolean();
+ private final AtomicInteger _numTablesProcessed = new AtomicInteger();
+ private final int _numTables = 3;
private final MockControllerPeriodicTask _task =
new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+
@Override
- public void onBecomeLeader() {
- _onBecomeLeaderCalled.set(true);
+ protected void initTask() {
+ _initTaskCalled.set(true);
}
@Override
- public void onBecomeNotLeader() {
- _onBecomeNonLeaderCalled.set(true);
+ public void stopTask() {
+ _stopTaskCalled.set(true);
}
@Override
- public void process(List<String> tables) {
+ public void process(List<String> tableNamesWithType) {
_processCalled.set(true);
+ super.process(tableNamesWithType);
+ }
+
+ @Override
+ public void processTable(String tableNameWithType) {
+ _numTablesProcessed.getAndIncrement();
}
};
+ @BeforeTest
+ public void beforeTest() {
+ List<String> tables = new ArrayList<>(_numTables);
+ IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
+ when(_resourceManager.getAllTables()).thenReturn(tables);
+ }
+
private void resetState() {
- _onBecomeLeaderCalled.set(false);
- _onBecomeNonLeaderCalled.set(false);
+ _initTaskCalled.set(false);
+ _stopTaskCalled.set(false);
_processCalled.set(false);
+ _numTablesProcessed.set(0);
}
@Test
@@ -66,56 +87,54 @@ public class ControllerPeriodicTaskTest {
}
@Test
- public void testChangeLeadership() {
+ public void testControllerPeriodicTaskCalls() {
// Initial state
resetState();
- _task.setLeader(false);
_task.init();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
+ assertTrue(_initTaskCalled.get());
assertFalse(_processCalled.get());
+ assertEquals(_numTablesProcessed.get(), 0);
+ assertFalse(_stopTaskCalled.get());
+ assertFalse(_task.shouldStopPeriodicTask());
- // From non-leader to non-leader
+ // run task - leadership gained
resetState();
_task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
- assertFalse(_processCalled.get());
+ assertFalse(_initTaskCalled.get());
+ assertTrue(_processCalled.get());
+ assertEquals(_numTablesProcessed.get(), _numTables);
+ assertFalse(_stopTaskCalled.get());
+ assertFalse(_task.shouldStopPeriodicTask());
- // From non-leader to leader
+ // stop periodic task - leadership lost
resetState();
- _task.setLeader(true);
- _task.run();
- assertTrue(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
- assertTrue(_processCalled.get());
+ _task.stop();
+ assertFalse(_initTaskCalled.get());
+ assertFalse(_processCalled.get());
+ assertEquals(_numTablesProcessed.get(), 0);
+ assertTrue(_stopTaskCalled.get());
+ assertTrue(_task.shouldStopPeriodicTask());
- // From leader to leader
+ // call to run after periodic task stop invoked - leadership gained back on same controller
resetState();
_task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_task.shouldStopPeriodicTask());
+ assertFalse(_initTaskCalled.get());
assertTrue(_processCalled.get());
+ assertEquals(_numTablesProcessed.get(), _numTables);
+ assertFalse(_stopTaskCalled.get());
- // From leader to non-leader
- resetState();
- _task.setLeader(false);
- _task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertTrue(_onBecomeNonLeaderCalled.get());
- assertFalse(_processCalled.get());
}
private class MockControllerPeriodicTask extends ControllerPeriodicTask {
- private boolean _isLeader = true;
public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
}
@Override
- protected void process(List<String> tables) {
+ protected void initTask() {
}
@@ -134,13 +153,10 @@ public class ControllerPeriodicTaskTest {
}
+
@Override
- protected boolean isLeader() {
- return _isLeader;
- }
+ public void stopTask() {
- void setLeader(boolean isLeader) {
- _isLeader = isLeader;
}
}
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 8e25505..c27b0ef 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
- RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -201,7 +201,7 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
- RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -306,16 +306,4 @@ public class RetentionManagerTest {
return segmentMetadata;
}
- private class MockRetentionManager extends RetentionManager {
-
- public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
- int deletedSegmentsRetentionInDays) {
- super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
- }
-
- @Override
- protected boolean isLeader() {
- return true;
- }
- }
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
index fac0750..f5b9c60 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
@@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable {
* @return task name.
*/
String getTaskName();
+
+ /**
+ * Stop the periodic task
+ */
+ void stop();
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index cd07ea4..6397ad4 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -31,34 +31,43 @@ public class PeriodicTaskScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
private ScheduledExecutorService _executorService;
+ private List<PeriodicTask> _tasksWithValidInterval;
/**
- * Start scheduling periodic tasks.
+ * Initialize the PeriodicTaskScheduler with list of PeriodicTasks
+ * @param periodicTasks
*/
- public void start(List<PeriodicTask> periodicTasks) {
- if (_executorService != null) {
- LOGGER.warn("Periodic task scheduler already started");
- }
-
- List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+ public void init(List<PeriodicTask> periodicTasks) {
+ _tasksWithValidInterval = new ArrayList<>();
for (PeriodicTask periodicTask : periodicTasks) {
if (periodicTask.getIntervalInSeconds() > 0) {
LOGGER.info("Adding periodic task: {}", periodicTask);
- tasksWithValidInterval.add(periodicTask);
+ _tasksWithValidInterval.add(periodicTask);
+ periodicTask.init();
} else {
LOGGER.info("Skipping periodic task: {}", periodicTask);
}
}
+ }
- if (tasksWithValidInterval.isEmpty()) {
+ /**
+ * Start scheduling periodic tasks.
+ */
+ public synchronized void start() {
+ if (_executorService != null) {
+ LOGGER.warn("Periodic task scheduler already started");
+ }
+
+ if (_tasksWithValidInterval.isEmpty()) {
LOGGER.warn("No periodic task scheduled");
} else {
- LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
- _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
- for (PeriodicTask periodicTask : tasksWithValidInterval) {
- periodicTask.init();
+ LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval);
+ _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
+ for (PeriodicTask periodicTask : _tasksWithValidInterval) {
_executorService.scheduleWithFixedDelay(() -> {
try {
+ LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(),
+ periodicTask.getIntervalInSeconds());
periodicTask.run();
} catch (Throwable e) {
// catch all errors to prevent subsequent executions from being silently suppressed
@@ -70,11 +79,19 @@ public class PeriodicTaskScheduler {
}
}
- public void stop() {
+ /**
+ * Shutdown executor service and stop the periodic tasks
+ */
+ public synchronized void stop() {
if (_executorService != null) {
LOGGER.info("Stopping periodic task scheduler");
_executorService.shutdown();
_executorService = null;
}
+
+ if (_tasksWithValidInterval != null) {
+ LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
+ _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+ }
}
}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a105435..fb3d2ad 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest {
public void testTaskWithInvalidInterval() throws Exception {
AtomicBoolean initCalled = new AtomicBoolean();
AtomicBoolean runCalled = new AtomicBoolean();
+ AtomicBoolean stopCalled = new AtomicBoolean();
List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
@Override
@@ -39,18 +40,25 @@ public class PeriodicTaskSchedulerTest {
}
@Override
+ public void stop() {
+ stopCalled.set(true);
+ }
+
+ @Override
public void run() {
runCalled.set(true);
}
});
PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
- taskScheduler.start(periodicTasks);
+ taskScheduler.init(periodicTasks);
+ taskScheduler.start();
Thread.sleep(100L);
taskScheduler.stop();
assertFalse(initCalled.get());
assertFalse(runCalled.get());
+ assertFalse(stopCalled.get());
}
@Test
@@ -58,6 +66,7 @@ public class PeriodicTaskSchedulerTest {
int numTasks = 3;
AtomicInteger numTimesInitCalled = new AtomicInteger();
AtomicInteger numTimesRunCalled = new AtomicInteger();
+ AtomicInteger numTimesStopCalled = new AtomicInteger();
List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
for (int i = 0; i < numTasks; i++) {
@@ -68,6 +77,11 @@ public class PeriodicTaskSchedulerTest {
}
@Override
+ public void stop() {
+ numTimesStopCalled.getAndIncrement();
+ }
+
+ @Override
public void run() {
numTimesRunCalled.getAndIncrement();
}
@@ -75,11 +89,13 @@ public class PeriodicTaskSchedulerTest {
}
PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
- taskScheduler.start(periodicTasks);
+ taskScheduler.init(periodicTasks);
+ taskScheduler.start();
Thread.sleep(1100L);
taskScheduler.stop();
assertEquals(numTimesInitCalled.get(), numTasks);
assertEquals(numTimesRunCalled.get(), numTasks * 2);
+ assertEquals(numTimesStopCalled.get(), numTasks);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org