You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/20 01:49:45 UTC
[incubator-pinot] 01/01: Start and stop ControllerPeriodicTasks
based on leadership changes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch start_stop_periodic_tasks_on_leadership_changes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 037cbc08ab9c9b86d0233311da0312d30b3462da
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Dec 19 17:49:25 2018 -0800
Start and stop ControllerPeriodicTasks based on leadership changes
---
.../pinot/controller/ControllerStarter.java | 9 ++-
.../controller/helix/SegmentStatusChecker.java | 12 +--
.../helix/core/minion/PinotTaskManager.java | 25 +++---
.../core/periodictask/ControllerPeriodicTask.java | 94 +++++++++++-----------
.../ControllerPeriodicTaskScheduler.java | 36 +++++++++
.../core/relocation/RealtimeSegmentRelocator.java | 5 ++
.../helix/core/retention/RetentionManager.java | 9 ++-
.../controller/validation/ValidationManager.java | 12 +--
.../controller/helix/SegmentStatusCheckerTest.java | 33 +++-----
.../periodictask/ControllerPeriodicTaskTest.java | 76 +++++++++--------
.../helix/core/retention/RetentionManagerTest.java | 16 +---
.../pinot/core/periodictask/PeriodicTask.java | 5 ++
.../core/periodictask/PeriodicTaskScheduler.java | 23 ++++--
.../periodictask/PeriodicTaskSchedulerTest.java | 14 ++++
14 files changed, 209 insertions(+), 160 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 33fb5ce..1f3b316 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
@@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
import com.linkedin.pinot.controller.validation.ValidationManager;
import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
import com.linkedin.pinot.filesystem.PinotFSFactory;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
@@ -80,7 +80,7 @@ public class ControllerStarter {
private final PinotRealtimeSegmentManager _realtimeSegmentsManager;
private final SegmentStatusChecker _segmentStatusChecker;
private final ExecutorService _executorService;
- private final PeriodicTaskScheduler _periodicTaskScheduler;
+ private final ControllerPeriodicTaskScheduler _periodicTaskScheduler;
// Can only be constructed after resource manager getting started
private ValidationManager _validationManager;
@@ -101,7 +101,7 @@ public class ControllerStarter {
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
_segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
_realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
- _periodicTaskScheduler = new PeriodicTaskScheduler();
+ _periodicTaskScheduler = new ControllerPeriodicTaskScheduler();
}
public PinotHelixResourceManager getHelixResourceManager() {
@@ -183,7 +183,8 @@ public class ControllerStarter {
periodicTasks.add(_validationManager);
periodicTasks.add(_segmentStatusChecker);
periodicTasks.add(_realtimeSegmentRelocator);
- _periodicTaskScheduler.start(periodicTasks);
+ _periodicTaskScheduler.init(periodicTasks);
+
LOGGER.info("Creating rebalance segments factory");
RebalanceSegmentStrategyFactory.createInstance(helixManager);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 8059d9e..a1550f8 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -84,12 +84,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
}
@Override
- public void onBecomeNotLeader() {
- LOGGER.info("Resetting table metrics for all the tables.");
- setStatusToDefault();
- }
-
- @Override
protected void preprocess() {
_realTimeTableCount = 0;
_offlineTableCount = 0;
@@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
_metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
_metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
}
+
+ @Override
+ public void cleanup() {
+ LOGGER.info("Resetting table metrics for all the tables.");
+ setStatusToDefault();
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 729c75c..76c030f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -93,19 +93,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
return getTasksScheduled();
}
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
- */
- @Override
- public void onBecomeNotLeader() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
- for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
- _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
- }
- }
-
-
@Override
protected void preprocess() {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
@@ -163,4 +150,16 @@ public class PinotTaskManager extends ControllerPeriodicTask {
private Map<String, String> getTasksScheduled() {
return _tasksScheduled;
}
+
+ /**
+ * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+ */
+ @Override
+ public void cleanup() {
+ LOGGER.info("Perform task cleanups.");
+ // Performs necessary cleanups for each task type.
+ for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+ _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index c416d0e..3a9d045 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -16,7 +16,6 @@
package com.linkedin.pinot.controller.helix.core.periodictask;
import com.google.common.annotations.VisibleForTesting;
-import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
import java.util.List;
@@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+ private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
protected final PinotHelixResourceManager _pinotHelixResourceManager;
- private boolean _isLeader = false;
+ private volatile boolean _stopPeriodicTask = false;
+ private volatile boolean _periodicTaskInProgress = false;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
@@ -61,61 +63,61 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
@Override
public void run() {
- if (!isLeader()) {
- skipLeaderTask();
- } else {
- List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
- processLeaderTask(allTableNames);
- }
- }
-
- private void skipLeaderTask() {
- if (_isLeader) {
- LOGGER.info("Current pinot controller lost leadership.");
- _isLeader = false;
- onBecomeNotLeader();
- }
- LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName);
- }
-
- private void processLeaderTask(List<String> tables) {
- if (!_isLeader) {
- LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.",
- _taskName, _intervalInSeconds);
- _isLeader = true;
- onBecomeLeader();
- }
+ _periodicTaskInProgress = true;
+ List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables();
long startTime = System.currentTimeMillis();
- int numTables = tables.size();
+ int numTables = tableNamesWithType.size();
LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName);
- process(tables);
+ process(tableNamesWithType);
LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName,
(System.currentTimeMillis() - startTime));
+ _periodicTaskInProgress = false;
}
- /**
- * Does the following logic when losing the leadership. This should be done only once during leadership transition.
- */
- public void onBecomeNotLeader() {
- }
- /**
- * Does the following logic when becoming lead controller. This should be done only once during leadership transition.
- */
- public void onBecomeLeader() {
+ @Override
+ public void stop() {
+ _stopPeriodicTask = true;
+
+ LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", _taskName,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+ long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+ while (_periodicTaskInProgress && millisToWait > 0) {
+ try {
+ long thisWait = 1000;
+ if (millisToWait < thisWait) {
+ thisWait = millisToWait;
+ }
+ Thread.sleep(thisWait);
+ millisToWait -= thisWait;
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted: Remaining wait time {} (out of {})", millisToWait,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+ break;
+ }
+ }
+ LOGGER.info("Wait completed. _periodicTaskInProgress = {}", _periodicTaskInProgress);
+
+ cleanup();
}
+
/**
* Processes the task on the given tables.
*
- * @param tables List of table names
+ * @param tableNamesWithType List of table names
*/
- protected void process(List<String> tables) {
- preprocess();
- for (String table : tables) {
- processTable(table);
+ protected void process(List<String> tableNamesWithType) {
+ if (!isStopPeriodicTask()) {
+ preprocess();
+ for (String table : tableNamesWithType) {
+ if (isStopPeriodicTask()) {
+ break;
+ }
+ processTable(table);
+ }
+ postprocess();
}
- postprocess();
}
/**
@@ -135,7 +137,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
protected abstract void postprocess();
@VisibleForTesting
- protected boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ protected boolean isStopPeriodicTask() {
+ return _stopPeriodicTask;
}
+
+ protected abstract void cleanup();
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
new file mode 100644
index 0000000..15b18c0
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -0,0 +1,36 @@
+package com.linkedin.pinot.controller.helix.core.periodictask;
+
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.core.periodictask.PeriodicTask;
+import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
+import java.util.List;
+
+
+/**
+ * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} which are created on controller startup
+ * and started/stopped on controller leadership changes
+ */
+public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber {
+
+ private List<PeriodicTask> _controllerPeriodicTasks;
+
+ /**
+ * Initialize the {@link ControllerPeriodicTaskScheduler} with the {@link ControllerPeriodicTask} created at startup
+ * @param controllerPeriodicTasks
+ */
+ public void init(List<PeriodicTask> controllerPeriodicTasks) {
+ _controllerPeriodicTasks = controllerPeriodicTasks;
+ ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+ }
+
+ @Override
+ public void onBecomingLeader() {
+ start(_controllerPeriodicTasks);
+ }
+
+ @Override
+ public void onBecomingNonLeader() {
+ stop();
+ }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index e324c11..90c9435 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -269,4 +269,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
return seconds;
}
+
+ @Override
+ public void cleanup() {
+
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 80894fc..5127835 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -53,10 +53,7 @@ public class RetentionManager extends ControllerPeriodicTask {
int deletedSegmentsRetentionInDays) {
super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
- }
- @Override
- public void onBecomeLeader() {
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
}
@@ -185,4 +182,10 @@ public class RetentionManager extends ControllerPeriodicTask {
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
}
}
+
+
+ @Override
+ public void cleanup() {
+
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 18025eb..79a9d1b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask {
}
@Override
- public void onBecomeNotLeader() {
- LOGGER.info("Unregister all the validation metrics.");
- _validationMetrics.unregisterAllMetrics();
- }
-
- @Override
protected void preprocess() {
// Run segment level validation using a separate interval
_runSegmentLevelValidation = false;
@@ -312,4 +306,10 @@ public class ValidationManager extends ControllerPeriodicTask {
return numTotalDocs;
}
+
+ @Override
+ public void cleanup() {
+ LOGGER.info("Unregister all the validation metrics.");
+ _validationMetrics.unregisterAllMetrics();
+ }
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 4d4c324..1246991 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest {
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
}
-
- private class MockSegmentStatusChecker extends SegmentStatusChecker {
-
- public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics metricsRegistry) {
- super(pinotHelixResourceManager, config, metricsRegistry);
- }
-
- @Override
- protected boolean isLeader() {
- return true;
- }
- }
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 94c9f52..c4903f7 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -28,33 +28,32 @@ public class ControllerPeriodicTaskTest {
private static final long RUN_FREQUENCY_IN_SECONDS = 30;
private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
- private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
- private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+ private final AtomicBoolean _cleanupCalled = new AtomicBoolean();
private final AtomicBoolean _processCalled = new AtomicBoolean();
+ private final AtomicBoolean _processTableCalled = new AtomicBoolean();
private final MockControllerPeriodicTask _task =
new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
- @Override
- public void onBecomeLeader() {
- _onBecomeLeaderCalled.set(true);
- }
@Override
- public void onBecomeNotLeader() {
- _onBecomeNonLeaderCalled.set(true);
+ public void cleanup() {
+ _cleanupCalled.set(true);
}
@Override
- public void process(List<String> tables) {
+ public void process(List<String> tableNamesWithType) {
_processCalled.set(true);
}
+ @Override
+ public void processTable(String tableNameWithType) { _processTableCalled.set(true);}
+
};
private void resetState() {
- _onBecomeLeaderCalled.set(false);
- _onBecomeNonLeaderCalled.set(false);
+ _cleanupCalled.set(false);
_processCalled.set(false);
+ _processTableCalled.set(false);
}
@Test
@@ -66,56 +65,48 @@ public class ControllerPeriodicTaskTest {
}
@Test
- public void testChangeLeadership() {
+ public void testControllerPeriodicTaskCalls() {
// Initial state
resetState();
- _task.setLeader(false);
_task.init();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
- assertFalse(_processCalled.get());
-
- // From non-leader to non-leader
- resetState();
- _task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_cleanupCalled.get());
assertFalse(_processCalled.get());
+ assertFalse(_processTableCalled.get());
- // From non-leader to leader
+ // run task
resetState();
- _task.setLeader(true);
_task.run();
- assertTrue(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_cleanupCalled.get());
assertTrue(_processCalled.get());
+ assertFalse(_processTableCalled.get());
- // From leader to leader
+ // stop periodic task flag set, task will not run
resetState();
+ _task.setStopPeriodicTask(true);
_task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_cleanupCalled.get());
assertTrue(_processCalled.get());
+ assertFalse(_processTableCalled.get());
- // From leader to non-leader
+ // stop periodic task
resetState();
- _task.setLeader(false);
- _task.run();
- assertFalse(_onBecomeLeaderCalled.get());
- assertTrue(_onBecomeNonLeaderCalled.get());
+ _task.stop();
+ assertTrue(_cleanupCalled.get());
assertFalse(_processCalled.get());
+ assertFalse(_processTableCalled.get());
+
}
private class MockControllerPeriodicTask extends ControllerPeriodicTask {
- private boolean _isLeader = true;
+ private boolean _isStopPeriodicTask = false;
public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
}
@Override
- protected void process(List<String> tables) {
+ protected void process(List<String> tableNamesWithType) {
}
@@ -135,12 +126,17 @@ public class ControllerPeriodicTaskTest {
}
@Override
- protected boolean isLeader() {
- return _isLeader;
+ protected boolean isStopPeriodicTask() {
+ return _isStopPeriodicTask;
}
- void setLeader(boolean isLeader) {
- _isLeader = isLeader;
+ void setStopPeriodicTask(boolean isStopPeriodicTask) {
+ _isStopPeriodicTask = isStopPeriodicTask;
+ }
+
+ @Override
+ public void cleanup() {
+
}
}
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 8e25505..c27b0ef 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
- RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -201,7 +201,7 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
- RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -306,16 +306,4 @@ public class RetentionManagerTest {
return segmentMetadata;
}
- private class MockRetentionManager extends RetentionManager {
-
- public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
- int deletedSegmentsRetentionInDays) {
- super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
- }
-
- @Override
- protected boolean isLeader() {
- return true;
- }
- }
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
index fac0750..f5b9c60 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java
@@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable {
* @return task name.
*/
String getTaskName();
+
+ /**
+ * Stop the periodic task
+ */
+ void stop();
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index cd07ea4..81c92a0 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -31,6 +31,7 @@ public class PeriodicTaskScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class);
private ScheduledExecutorService _executorService;
+ private List<PeriodicTask> _tasksWithValidInterval;
/**
* Start scheduling periodic tasks.
@@ -40,25 +41,27 @@ public class PeriodicTaskScheduler {
LOGGER.warn("Periodic task scheduler already started");
}
- List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+ _tasksWithValidInterval = new ArrayList<>();
for (PeriodicTask periodicTask : periodicTasks) {
if (periodicTask.getIntervalInSeconds() > 0) {
LOGGER.info("Adding periodic task: {}", periodicTask);
- tasksWithValidInterval.add(periodicTask);
+ _tasksWithValidInterval.add(periodicTask);
} else {
LOGGER.info("Skipping periodic task: {}", periodicTask);
}
}
- if (tasksWithValidInterval.isEmpty()) {
+ if (_tasksWithValidInterval.isEmpty()) {
LOGGER.warn("No periodic task scheduled");
} else {
- LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval);
- _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size());
- for (PeriodicTask periodicTask : tasksWithValidInterval) {
+ LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval);
+ _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
+ for (PeriodicTask periodicTask : _tasksWithValidInterval) {
periodicTask.init();
_executorService.scheduleWithFixedDelay(() -> {
try {
+ LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(),
+ periodicTask.getIntervalInSeconds());
periodicTask.run();
} catch (Throwable e) {
// catch all errors to prevent subsequent executions from being silently suppressed
@@ -70,11 +73,19 @@ public class PeriodicTaskScheduler {
}
}
+ /**
+ * Shutdown executor service and stop the periodic tasks
+ */
public void stop() {
if (_executorService != null) {
LOGGER.info("Stopping periodic task scheduler");
_executorService.shutdown();
_executorService = null;
}
+
+ if (_tasksWithValidInterval != null) {
+ LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
+ _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+ }
}
}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a105435..fbb3d92 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest {
public void testTaskWithInvalidInterval() throws Exception {
AtomicBoolean initCalled = new AtomicBoolean();
AtomicBoolean runCalled = new AtomicBoolean();
+ AtomicBoolean stopCalled = new AtomicBoolean();
List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
@Override
@@ -39,6 +40,11 @@ public class PeriodicTaskSchedulerTest {
}
@Override
+ public void stop() {
+ stopCalled.set(true);
+ }
+
+ @Override
public void run() {
runCalled.set(true);
}
@@ -51,6 +57,7 @@ public class PeriodicTaskSchedulerTest {
assertFalse(initCalled.get());
assertFalse(runCalled.get());
+ assertFalse(stopCalled.get());
}
@Test
@@ -58,6 +65,7 @@ public class PeriodicTaskSchedulerTest {
int numTasks = 3;
AtomicInteger numTimesInitCalled = new AtomicInteger();
AtomicInteger numTimesRunCalled = new AtomicInteger();
+ AtomicInteger numTimesStopCalled = new AtomicInteger();
List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
for (int i = 0; i < numTasks; i++) {
@@ -68,6 +76,11 @@ public class PeriodicTaskSchedulerTest {
}
@Override
+ public void stop() {
+ numTimesStopCalled.getAndIncrement();
+ }
+
+ @Override
public void run() {
numTimesRunCalled.getAndIncrement();
}
@@ -81,5 +94,6 @@ public class PeriodicTaskSchedulerTest {
assertEquals(numTimesInitCalled.get(), numTasks);
assertEquals(numTimesRunCalled.get(), numTasks * 2);
+ assertEquals(numTimesStopCalled.get(), numTasks);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org