You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/04 10:32:44 UTC

[incubator-pinot] 01/01: Adding a controller periodic task to clean up dead minion instances

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch cleanup_dead_minion_instances
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4c8cee3bf45a7de61d5cf8e13f78402119275db6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Feb 4 02:32:04 2021 -0800

    Adding a controller periodic task to clean up dead minion instances
---
 .../pinot/common/metrics/ControllerGauge.java      |  5 +-
 .../apache/pinot/controller/ControllerConf.java    | 24 ++++++++
 .../apache/pinot/controller/ControllerStarter.java |  8 +++
 .../core/minion/MinionInstancesCleanupTask.java    | 60 ++++++++++++++++++
 .../pinot/controller/helix/ControllerTest.java     | 72 ++++++++++++++++++++++
 .../minion/MinionInstancesCleanupTaskTest.java     | 66 ++++++++++++++++++++
 .../apache/pinot/spi/config/tenant/TenantRole.java |  2 +-
 7 files changed, 235 insertions(+), 2 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3f69f45..869db11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
   TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false),
 
   // Number of scheduled Cron jobs
-  CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false);
+  CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false),
+
+  // Number of dropped minion instances
+  DROPPED_MINION_INSTANCES("droppedMinionInstances", true);
 
   private final String gaugeName;
   private final String unit;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 76ab5ec..61d34cf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -88,6 +88,10 @@ public class ControllerConf extends PinotConfiguration {
     public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
         "controller.statuschecker.waitForPushTimeInSeconds";
     public static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+    public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = "controller.minion.instances.cleanup.task.frequencyInSeconds";
+    public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = "controller.minion.instances.cleanup.task.initialDelaySeconds";
+
+
     public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
     @Deprecated
     // RealtimeSegmentRelocator has been rebranded as SegmentRelocator
@@ -131,6 +135,8 @@ public class ControllerConf extends PinotConfiguration {
     private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
     private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
     private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+    private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+
     private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
     private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60;
   }
@@ -539,6 +545,24 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
   }
 
+  public long getMinionInstancesCleanupTaskFrequencyInSeconds() {
+    return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setMinionInstancesCleanupTaskFrequencyInSeconds(int frequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+  }
+
+  public long getMinionInstancesCleanupTaskInitialDelaySeconds() {
+    return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
+  public void setMinionInstancesCleanupTaskInitialDelaySeconds(int initialDelaySeconds) {
+    setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, Integer.toString(initialDelaySeconds));
+  }
+
   public int getDefaultTableMinReplicas() {
     return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 233f85e..0845283 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -63,6 +63,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.api.ControllerAdminApiApplication;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
 import org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
@@ -140,6 +141,7 @@ public class ControllerStarter implements ServiceStartable {
   private SegmentCompletionManager _segmentCompletionManager;
   private LeadControllerManager _leadControllerManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
+  private MinionInstancesCleanupTask _minionInstancesCleanupTask;
 
   public ControllerStarter(ControllerConf conf) {
     _config = conf;
@@ -252,6 +254,10 @@ public class ControllerStarter implements ServiceStartable {
     return _taskManager;
   }
 
+  public MinionInstancesCleanupTask getMinionInstancesCleanupTask() {
+    return _minionInstancesCleanupTask;
+  }
+
   @Override
   public ServiceRole getServiceRole() {
     return ServiceRole.CONTROLLER;
@@ -580,6 +586,8 @@ public class ControllerStarter implements ServiceStartable {
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
         _executorService);
     periodicTasks.add(_segmentRelocator);
+    _minionInstancesCleanupTask = new MinionInstancesCleanupTask(_helixResourceManager, _config, _controllerMetrics);
+    periodicTasks.add(_minionInstancesCleanupTask);
 
     return periodicTasks;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
new file mode 100644
index 0000000..0d0feed
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+
+
+/**
+ * A periodic task to clean up offline Minion instances to not spam Helix.
+ */
+public class MinionInstancesCleanupTask extends BasePeriodicTask {
+  protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final ControllerMetrics _controllerMetrics;
+
+  public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
+      ControllerMetrics controllerMetrics) {
+    super("MinionInstancesCleanupTask", controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
+        controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds());
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _controllerMetrics = controllerMetrics;
+  }
+
+  @Override
+  protected void runTask() {
+    List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances());
+    offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+    for (String offlineInstance : offlineInstances) {
+      if (offlineInstance.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
+        PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance);
+        if (response.isSuccessful()) {
+          _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index a88799d..34dece7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -93,6 +93,7 @@ public abstract class ControllerTest {
       new File(FileUtils.getTempDirectoryPath(), "test-controller-" + System.currentTimeMillis()).getAbsolutePath();
   protected static final String BROKER_INSTANCE_ID_PREFIX = "Broker_localhost_";
   protected static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
+  protected static final String MINION_INSTANCE_ID_PREFIX = "Minion_localhost_";
 
   protected final List<HelixManager> _fakeInstanceHelixManagers = new ArrayList<>();
 
@@ -381,6 +382,77 @@ public abstract class ControllerTest {
     }
   }
 
+  protected void addFakeMinionInstancesToAutoJoinHelixCluster(int numInstances)
+      throws Exception {
+    for (int i = 0; i < numInstances; i++) {
+      addFakeMinionInstanceToAutoJoinHelixCluster(MINION_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
+  protected void addFakeMinionInstanceToAutoJoinHelixCluster(String instanceId)
+      throws Exception {
+    HelixManager helixManager =
+        HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
+            ZkStarter.DEFAULT_ZK_STR);
+    helixManager.getStateMachineEngine()
+        .registerStateModelFactory(FakeMinionResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+            FakeMinionResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+    helixManager.connect();
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, UNTAGGED_MINION_INSTANCE);
+    _fakeInstanceHelixManagers.add(helixManager);
+  }
+
+  public static class FakeMinionResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+    private static final String STATE_MODEL_DEF = "MinionResourceOnlineOfflineStateModel";
+    private static final FakeMinionResourceOnlineOfflineStateModelFactory FACTORY_INSTANCE =
+        new FakeMinionResourceOnlineOfflineStateModelFactory();
+    private static final FakeMinionResourceOnlineOfflineStateModel STATE_MODEL_INSTANCE =
+        new FakeMinionResourceOnlineOfflineStateModel();
+
+    private FakeMinionResourceOnlineOfflineStateModelFactory() {
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String partitionName) {
+      return STATE_MODEL_INSTANCE;
+    }
+
+    @SuppressWarnings("unused")
+    @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+    public static class FakeMinionResourceOnlineOfflineStateModel extends StateModel {
+      private static final Logger LOGGER = LoggerFactory.getLogger(FakeMinionResourceOnlineOfflineStateModel.class);
+
+      private FakeMinionResourceOnlineOfflineStateModel() {
+      }
+
+      @Transition(from = "OFFLINE", to = "ONLINE")
+      public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+        LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+      }
+
+      @Transition(from = "OFFLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "OFFLINE")
+      public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+      }
+
+      @Transition(from = "ERROR", to = "OFFLINE")
+      public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+      }
+    }
+  }
+
   protected void stopFakeInstances() {
     for (HelixManager helixManager : _fakeInstanceHelixManagers) {
       helixManager.disconnect();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
new file mode 100644
index 0000000..363c795
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MinionInstancesCleanupTaskTest extends ControllerTest {
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    startZk();
+    startController();
+  }
+
+  @Test
+  public void testMinionInstancesCleanupTask()
+      throws Exception {
+    MinionInstancesCleanupTask minionInstancesCleanupTask = _controllerStarter.getMinionInstancesCleanupTask();
+    minionInstancesCleanupTask.runTask();
+    Assert.assertEquals(
+        _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0);
+    addFakeMinionInstancesToAutoJoinHelixCluster(3);
+    Assert.assertEquals(
+        _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0);
+    stopFakeInstance("Minion_localhost_0");
+    minionInstancesCleanupTask.runTask();
+    Assert.assertEquals(
+        _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1);
+    stopFakeInstance("Minion_localhost_1");
+    minionInstancesCleanupTask.runTask();
+    Assert.assertEquals(
+        _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2);
+    stopFakeInstance("Minion_localhost_2");
+    minionInstancesCleanupTask.runTask();
+    Assert.assertEquals(
+        _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3);
+  }
+
+  @AfterClass
+  public void teardown() {
+    stopController();
+    stopZk();
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
index 3298248..6b710dd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.spi.config.tenant;
 
 public enum TenantRole {
-  SERVER, BROKER
+  SERVER, BROKER, MINION
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org