You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/04 10:32:44 UTC
[incubator-pinot] 01/01: Adding a controller periodic task to clean
up dead minion instances
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch cleanup_dead_minion_instances
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4c8cee3bf45a7de61d5cf8e13f78402119275db6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Feb 4 02:32:04 2021 -0800
Adding a controller periodic task to clean up dead minion instances
---
.../pinot/common/metrics/ControllerGauge.java | 5 +-
.../apache/pinot/controller/ControllerConf.java | 24 ++++++++
.../apache/pinot/controller/ControllerStarter.java | 8 +++
.../core/minion/MinionInstancesCleanupTask.java | 60 ++++++++++++++++++
.../pinot/controller/helix/ControllerTest.java | 72 ++++++++++++++++++++++
.../minion/MinionInstancesCleanupTaskTest.java | 66 ++++++++++++++++++++
.../apache/pinot/spi/config/tenant/TenantRole.java | 2 +-
7 files changed, 235 insertions(+), 2 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3f69f45..869db11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false),
// Number of scheduled Cron jobs
- CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false);
+ CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false),
+
+ // Number of dropped minion instances
+ DROPPED_MINION_INSTANCES("droppedMinionInstances", true);
private final String gaugeName;
private final String unit;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 76ab5ec..61d34cf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -88,6 +88,10 @@ public class ControllerConf extends PinotConfiguration {
public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
"controller.statuschecker.waitForPushTimeInSeconds";
public static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+ public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = "controller.minion.instances.cleanup.task.frequencyInSeconds";
+ public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = "controller.minion.instances.cleanup.task.initialDelaySeconds";
+
+
public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
@Deprecated
// RealtimeSegmentRelocator has been rebranded as SegmentRelocator
@@ -131,6 +135,8 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+ private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+
private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60;
}
@@ -539,6 +545,24 @@ public class ControllerConf extends PinotConfiguration {
setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
}
+ public long getMinionInstancesCleanupTaskFrequencyInSeconds() {
+ return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS,
+ ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS);
+ }
+
+ public void setMinionInstancesCleanupTaskFrequencyInSeconds(int frequencyInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+ }
+
+ public long getMinionInstancesCleanupTaskInitialDelaySeconds() {
+ return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
+ public void setMinionInstancesCleanupTaskInitialDelaySeconds(int initialDelaySeconds) {
+ setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, Integer.toString(initialDelaySeconds));
+ }
+
public int getDefaultTableMinReplicas() {
return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 233f85e..0845283 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -63,6 +63,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.api.ControllerAdminApiApplication;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
import org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
@@ -140,6 +141,7 @@ public class ControllerStarter implements ServiceStartable {
private SegmentCompletionManager _segmentCompletionManager;
private LeadControllerManager _leadControllerManager;
private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
+ private MinionInstancesCleanupTask _minionInstancesCleanupTask;
public ControllerStarter(ControllerConf conf) {
_config = conf;
@@ -252,6 +254,10 @@ public class ControllerStarter implements ServiceStartable {
return _taskManager;
}
+ public MinionInstancesCleanupTask getMinionInstancesCleanupTask() {
+ return _minionInstancesCleanupTask;
+ }
+
@Override
public ServiceRole getServiceRole() {
return ServiceRole.CONTROLLER;
@@ -580,6 +586,8 @@ public class ControllerStarter implements ServiceStartable {
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentRelocator);
+ _minionInstancesCleanupTask = new MinionInstancesCleanupTask(_helixResourceManager, _config, _controllerMetrics);
+ periodicTasks.add(_minionInstancesCleanupTask);
return periodicTasks;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
new file mode 100644
index 0000000..0d0feed
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+
+
+/**
+ * A periodic task to clean up offline Minion instances to not spam Helix.
+ */
+public class MinionInstancesCleanupTask extends BasePeriodicTask {
+ protected final PinotHelixResourceManager _pinotHelixResourceManager;
+ protected final ControllerMetrics _controllerMetrics;
+
+ public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
+ ControllerMetrics controllerMetrics) {
+ super("MinionInstancesCleanupTask", controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
+ controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds());
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _controllerMetrics = controllerMetrics;
+ }
+
+ @Override
+ protected void runTask() {
+ List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances());
+ offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ for (String offlineInstance : offlineInstances) {
+ if (offlineInstance.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
+ PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance);
+ if (response.isSuccessful()) {
+ _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index a88799d..34dece7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -93,6 +93,7 @@ public abstract class ControllerTest {
new File(FileUtils.getTempDirectoryPath(), "test-controller-" + System.currentTimeMillis()).getAbsolutePath();
protected static final String BROKER_INSTANCE_ID_PREFIX = "Broker_localhost_";
protected static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
+ protected static final String MINION_INSTANCE_ID_PREFIX = "Minion_localhost_";
protected final List<HelixManager> _fakeInstanceHelixManagers = new ArrayList<>();
@@ -381,6 +382,77 @@ public abstract class ControllerTest {
}
}
+ protected void addFakeMinionInstancesToAutoJoinHelixCluster(int numInstances)
+ throws Exception {
+ for (int i = 0; i < numInstances; i++) {
+ addFakeMinionInstanceToAutoJoinHelixCluster(MINION_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ protected void addFakeMinionInstanceToAutoJoinHelixCluster(String instanceId)
+ throws Exception {
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
+ ZkStarter.DEFAULT_ZK_STR);
+ helixManager.getStateMachineEngine()
+ .registerStateModelFactory(FakeMinionResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+ FakeMinionResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+ helixManager.connect();
+ HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, UNTAGGED_MINION_INSTANCE);
+ _fakeInstanceHelixManagers.add(helixManager);
+ }
+
+ public static class FakeMinionResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private static final String STATE_MODEL_DEF = "MinionResourceOnlineOfflineStateModel";
+ private static final FakeMinionResourceOnlineOfflineStateModelFactory FACTORY_INSTANCE =
+ new FakeMinionResourceOnlineOfflineStateModelFactory();
+ private static final FakeMinionResourceOnlineOfflineStateModel STATE_MODEL_INSTANCE =
+ new FakeMinionResourceOnlineOfflineStateModel();
+
+ private FakeMinionResourceOnlineOfflineStateModelFactory() {
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partitionName) {
+ return STATE_MODEL_INSTANCE;
+ }
+
+ @SuppressWarnings("unused")
+ @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+ public static class FakeMinionResourceOnlineOfflineStateModel extends StateModel {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeMinionResourceOnlineOfflineStateModel.class);
+
+ private FakeMinionResourceOnlineOfflineStateModel() {
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+ }
+ }
+ }
+
protected void stopFakeInstances() {
for (HelixManager helixManager : _fakeInstanceHelixManagers) {
helixManager.disconnect();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
new file mode 100644
index 0000000..363c795
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MinionInstancesCleanupTaskTest extends ControllerTest {
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ startZk();
+ startController();
+ }
+
+ @Test
+ public void testMinionInstancesCleanupTask()
+ throws Exception {
+ MinionInstancesCleanupTask minionInstancesCleanupTask = _controllerStarter.getMinionInstancesCleanupTask();
+ minionInstancesCleanupTask.runTask();
+ Assert.assertEquals(
+ _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0);
+ addFakeMinionInstancesToAutoJoinHelixCluster(3);
+ Assert.assertEquals(
+ _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0);
+ stopFakeInstance("Minion_localhost_0");
+ minionInstancesCleanupTask.runTask();
+ Assert.assertEquals(
+ _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1);
+ stopFakeInstance("Minion_localhost_1");
+ minionInstancesCleanupTask.runTask();
+ Assert.assertEquals(
+ _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2);
+ stopFakeInstance("Minion_localhost_2");
+ minionInstancesCleanupTask.runTask();
+ Assert.assertEquals(
+ _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3);
+ }
+
+ @AfterClass
+ public void teardown() {
+ stopController();
+ stopZk();
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
index 3298248..6b710dd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
@@ -19,5 +19,5 @@
package org.apache.pinot.spi.config.tenant;
public enum TenantRole {
- SERVER, BROKER
+ SERVER, BROKER, MINION
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org