You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/12 08:00:37 UTC

[helix] branch cluster-pause-mode updated: Move pause and maintenance handling out of controller (#1793)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/cluster-pause-mode by this push:
     new 57180c4  Move pause and maintenance handling out of controller (#1793)
57180c4 is described below

commit 57180c420239d192b7559e4fb4b0d9b70c6f70f9
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Sat Jun 12 01:00:25 2021 -0700

    Move pause and maintenance handling out of controller (#1793)
    
    With management mode pipeline, the pause and maintenance signals handling logic should be moved out of the onControllerChange() and moved to the management mode pipeline.
    
    This commit handles pause/maintenance signals enable/disable and update cluster status accordingly.
---
 .../helix/controller/GenericHelixController.java   | 80 +++-------------------
 .../dataproviders/BaseControllerDataProvider.java  | 21 +++++-
 .../ManagementControllerDataProvider.java          | 24 ++++++-
 .../controller/stages/ManagementModeStage.java     | 16 +++--
 .../controller/stages/ResourceValidationStage.java | 29 ++++++--
 .../main/java/org/apache/helix/model/Message.java  |  5 ++
 .../BestPossibleExternalViewVerifier.java          |  2 +-
 .../main/java/org/apache/helix/util/HelixUtil.java | 35 ++++++++--
 .../java/org/apache/helix/util/RebalanceUtil.java  |  4 +-
 9 files changed, 119 insertions(+), 97 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 729d9ff..ccc00f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -105,9 +105,7 @@ import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -179,13 +177,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   private long _continuousTaskRebalanceFailureCount = 0;
 
   /**
-   * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
-   * will be no-op. Other event handling logic keeps the same when the flag is set.
-   */
-  private boolean _paused;
-  private boolean _inMaintenanceMode;
-
-  /**
    * The executors that can periodically run the rebalancing pipeline. A
    * SingleThreadScheduledExecutor will start if there is resource group that has the config to do
    * periodically rebalance.
@@ -836,18 +827,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     // have this instanceof clauses
     List<Pipeline> pipelines;
     boolean isTaskFrameworkPipeline = false;
-    Pipeline.Type pipelineType;
+    boolean isManagementPipeline = false;
 
     if (dataProvider instanceof ResourceControllerDataProvider) {
       pipelines = _registry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.DEFAULT;
     } else if (dataProvider instanceof WorkflowControllerDataProvider) {
       pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
       isTaskFrameworkPipeline = true;
-      pipelineType = Pipeline.Type.TASK;
     } else if (dataProvider instanceof ManagementControllerDataProvider) {
       pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+      isManagementPipeline = true;
     } else {
       logger.warn(String
           .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
@@ -856,11 +845,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
 
     // Should not run management mode and default/task pipelines at the same time.
-    if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
-        || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+    if (_inManagementMode != isManagementPipeline) {
       logger.info("Should not run management mode and default/task pipelines at the same time. "
-              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
-          manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
+              + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
+          manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
       return;
     }
 
@@ -880,6 +868,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
         }
         if (_isMonitoring) {
+          _clusterStatusMonitor.setEnabled(!_inManagementMode);
+          _clusterStatusMonitor.setPaused(_inManagementMode);
           event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
         }
       }
@@ -1334,25 +1324,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
 
     if (controllerIsLeader) {
-      HelixManager manager = changeContext.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-      MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
-      boolean prevPaused = _paused;
-      boolean prevInMaintenanceMode = _inMaintenanceMode;
-      _paused = updateControllerState(pauseSignal, _paused);
-      _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
-      // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
-      if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
-        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
-      }
-
       enableClusterStatusMonitor(true);
-      _clusterStatusMonitor.setEnabled(!_paused);
-      _clusterStatusMonitor.setPaused(_paused);
-      _clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
+      pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
     } else {
       enableClusterStatusMonitor(false);
       // Note that onControllerChange is executed in parallel with the event processing thread. It
@@ -1542,43 +1515,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
   }
 
-  private boolean updateControllerState(PauseSignal signal, boolean statusFlag) {
-    if (signal != null) {
-      if (!statusFlag) {
-        statusFlag = true;
-        // This log is recorded for the first time entering PAUSE/MAINTENANCE mode
-        logger.info(String.format("controller is now %s",
-            (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
-      }
-    } else {
-      statusFlag = false;
-    }
-    return statusFlag;
-  }
-
-  /**
-   * Trigger a Resume Event if the cluster is back to activated.
-   * @param changeContext
-   * @param prevPaused the previous paused status.
-   * @param prevInMaintenanceMode the previous in maintenance mode status.
-   */
-  private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
-      boolean prevInMaintenanceMode) {
-    /**
-     * WARNING: the logic here is tricky.
-     * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still
-     * paused, the resume event should not be sent.
-     * 2. Only send resume event if the status is changed back to active. So we don't send multiple
-     * event unnecessarily.
-     */
-    if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
-      pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
-      logger.info("controller is now resumed from paused/maintenance state");
-      return true;
-    }
-    return false;
-  }
-
   // TODO: refactor this to use common/ClusterEventProcessor.
   @Deprecated
   private class ClusterEventProcessor extends Thread {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6ce25e2..3c705f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -57,6 +57,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
@@ -87,6 +88,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   private boolean _updateInstanceOfflineTime = true;
   private MaintenanceSignal _maintenanceSignal;
+  private PauseSignal _pauseSignal;
   private boolean _isMaintenanceModeEnabled;
   private boolean _hasMaintenanceSignalChanged;
   private ExecutorService _asyncTasksThreadPool;
@@ -300,8 +302,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     }
   }
 
-  private void updateMaintenanceInfo(final HelixDataAccessor accessor) {
+  private void refreshManagementSignals(final HelixDataAccessor accessor) {
     _maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance());
+    _pauseSignal = accessor.getProperty(accessor.keyBuilder().pause());
     _isMaintenanceModeEnabled = _maintenanceSignal != null;
     // The following flag is to guarantee that there's only one update per pineline run because we
     // check for whether maintenance recovery could happen twice every pipeline
@@ -373,7 +376,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
-    updateMaintenanceInfo(accessor);
+    refreshManagementSignals(accessor);
     timeoutNodesDuringMaintenance(accessor, _clusterConfig, _isMaintenanceModeEnabled);
 
     // TODO: once controller gets split, only one controller should update offline instance history
@@ -622,6 +625,16 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   /**
+   * Gets all messages for each instance.
+   *
+   * @return Map of {instanceName -> Collection of Message}.
+   */
+  public Map<String, Collection<Message>> getAllInstancesMessages() {
+    return getAllInstances().stream().collect(
+        Collectors.toMap(instance -> instance, instance -> getMessages(instance).values()));
+  }
+
+  /**
    * This function is supposed to be only used by testing purpose for safety. For "get" usage,
    * please use getStaleMessagesByInstance.
    */
@@ -968,6 +981,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     return _maintenanceSignal;
   }
 
+  public PauseSignal getPauseSignal() {
+    return _pauseSignal;
+  }
+
   protected StringBuilder genCacheContentStringBuilder() {
     StringBuilder sb = new StringBuilder();
     sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index d178ca5..fe940ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -19,9 +19,27 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+
+/**
+ * Data provider for controller management mode pipeline.
+ */
 public class ManagementControllerDataProvider extends BaseControllerDataProvider {
-  // TODO: implement this class to only refresh required event types
-  public ManagementControllerDataProvider(String clusterName, String name) {
-    super(clusterName, name);
+  // Only these types of properties are refreshed for the full refresh request.
+  private static final List<HelixConstants.ChangeType> FULL_REFRESH_PROPERTIES =
+      Arrays.asList(HelixConstants.ChangeType.LIVE_INSTANCE, HelixConstants.ChangeType.MESSAGE);
+
+  public ManagementControllerDataProvider(String clusterName, String pipelineName) {
+    super(clusterName, pipelineName);
+  }
+
+  @Override
+  public void requireFullRefresh() {
+    for (HelixConstants.ChangeType type : FULL_REFRESH_PROPERTIES) {
+      _propertyDataChangedMap.get(type).set(true);
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 512224d..042aa14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -36,13 +36,21 @@ public class ManagementModeStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     // TODO: implement the stage
+    _eventId = event.getEventId();
     String clusterName = event.getClusterName();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
-    if (!HelixUtil.inManagementMode(cache)) {
-      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+
+    // TODO: move to the last stage of management pipeline
+    checkInManagementMode(clusterName, cache);
+  }
+
+  private void checkInManagementMode(String clusterName, ManagementControllerDataProvider cache) {
+    // Should exit management mode
+    if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName);
       RebalanceUtil.enableManagementMode(clusterName, false);
-      throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index a4d4783..613ce2e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -44,12 +45,7 @@ public class ResourceValidationStage extends AbstractBaseStage {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
 
-    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
-    if (HelixUtil.inManagementMode(cache)) {
-      // Trigger an immediate management mode pipeline.
-      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
-      throw new StageException("Pipeline should not be run because cluster is in management mode");
-    }
+    processManagementMode(event, cache);
 
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
@@ -91,6 +87,27 @@ public class ResourceValidationStage extends AbstractBaseStage {
     }
   }
 
+  private void processManagementMode(ClusterEvent event, BaseControllerDataProvider cache)
+      throws StageException {
+    // Set cluster status monitor for maintenance mode
+    ClusterStatusMonitor monitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    if (monitor != null) {
+      monitor.setMaintenance(cache.isMaintenanceModeEnabled());
+    }
+
+    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+    if (HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      // Trigger an immediate management mode pipeline.
+      LogUtil.logInfo(LOG, _eventId,
+          "Enabling management mode pipeline for cluster " + event.getClusterName());
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException(
+          "Pipeline should not be run because cluster " + event.getClusterName()
+              + "is in management mode");
+    }
+  }
+
   /**
    * Check if the ideal state adheres to a rule
    * @param idealState the ideal state to check
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b509506..6222df5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -59,6 +59,7 @@ public class Message extends HelixProperty {
     NO_OP,
     PARTICIPANT_ERROR_REPORT,
     PARTICIPANT_SESSION_CHANGE,
+    PARTICIPANT_STATUS_CHANGE,
     CHAINED_MESSAGE, // this is a message subtype
     RELAYED_MESSAGE
   }
@@ -927,6 +928,10 @@ public class Message extends HelixProperty {
     return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
   }
 
+  public boolean isParticipantStatusChangeType() {
+    return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType());
+  }
+
   /**
    * Get the {@link PropertyKey} for this message
    * @param keyBuilder PropertyKey Builder
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index f916583..083daa3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -415,7 +415,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
    */
   private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache, Set<String> resources)
       throws Exception {
-    ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
+    ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.StateVerifier);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
 
     RebalanceUtil.runStage(event, new ResourceComputationStage());
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 60c3aaa..f0ec1b4 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -21,6 +21,7 @@ package org.apache.helix.util;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,7 +39,6 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -59,6 +59,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -519,13 +520,33 @@ public final class HelixUtil {
   }
 
   /**
-   * Checks whether or not the cluster is in management mode.
+   * Checks whether or not the cluster is in management mode. It checks:
+   * - pause signal
+   * - live instances: whether any live instance is not in normal status, eg. frozen.
+   * - messages: whether live instance has a participant status change message
    *
-   * @param cache
-   * @return
+   * @param pauseSignal pause signal
+   * @param liveInstanceMap map of live instances
+   * @param enabledLiveInstances set of enabled live instance names. They should be all included
+   *                             in the liveInstanceMap.
+   * @param instancesMessages a map of all instances' messages.
+   * @return true if cluster is in management mode; otherwise, false
    */
-  public static boolean inManagementMode(BaseControllerDataProvider cache) {
-    // TODO: implement the logic. Parameters can also change
-    return true;
+  public static boolean inManagementMode(PauseSignal pauseSignal,
+      Map<String, LiveInstance> liveInstanceMap, Set<String> enabledLiveInstances,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check pause signal and abnormal live instances (eg. in freeze mode)
+    // TODO: should check maintenance signal when moving maintenance to management pipeline
+    return pauseSignal != null || enabledLiveInstances.stream().anyMatch(
+        instance -> isInstanceInManagementMode(instance, liveInstanceMap, instancesMessages));
+  }
+
+  private static boolean isInstanceInManagementMode(String instance,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check live instance status and participant status change message
+    return LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+        || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
+        .anyMatch(Message::isParticipantStatusChangeType));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index db2b76f..f74b98f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,8 +159,8 @@ public class RebalanceUtil {
           enabled);
       leaderController.setInManagementMode(enabled);
     } else {
-      LOG.error("Failed to switch management mode pipeline, enabled={}. "
-          + "Controller for cluster {} does not exist", clusterName, enabled);
+      throw new HelixException(String.format("Failed to switch management mode pipeline, "
+          + "enabled=%s. Controller for cluster %s does not exist", enabled, clusterName));
     }
 
     // Triggers an event to immediately run the pipeline