You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/12 08:00:37 UTC
[helix] branch cluster-pause-mode updated: Move pause and
maintenance handling out of controller (#1793)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/cluster-pause-mode by this push:
new 57180c4 Move pause and maintenance handling out of controller (#1793)
57180c4 is described below
commit 57180c420239d192b7559e4fb4b0d9b70c6f70f9
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Sat Jun 12 01:00:25 2021 -0700
Move pause and maintenance handling out of controller (#1793)
With management mode pipeline, the pause and maintenance signals handling logic should be moved out of the onControllerChange() and moved to the management mode pipeline.
This commit handles pause/maintenance signals enable/disable and update cluster status accordingly.
---
.../helix/controller/GenericHelixController.java | 80 +++-------------------
.../dataproviders/BaseControllerDataProvider.java | 21 +++++-
.../ManagementControllerDataProvider.java | 24 ++++++-
.../controller/stages/ManagementModeStage.java | 16 +++--
.../controller/stages/ResourceValidationStage.java | 29 ++++++--
.../main/java/org/apache/helix/model/Message.java | 5 ++
.../BestPossibleExternalViewVerifier.java | 2 +-
.../main/java/org/apache/helix/util/HelixUtil.java | 35 ++++++++--
.../java/org/apache/helix/util/RebalanceUtil.java | 4 +-
9 files changed, 119 insertions(+), 97 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 729d9ff..ccc00f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -105,9 +105,7 @@ import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -179,13 +177,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
private long _continuousTaskRebalanceFailureCount = 0;
/**
- * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
- * will be no-op. Other event handling logic keeps the same when the flag is set.
- */
- private boolean _paused;
- private boolean _inMaintenanceMode;
-
- /**
* The executors that can periodically run the rebalancing pipeline. A
* SingleThreadScheduledExecutor will start if there is resource group that has the config to do
* periodically rebalance.
@@ -836,18 +827,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
// have this instanceof clauses
List<Pipeline> pipelines;
boolean isTaskFrameworkPipeline = false;
- Pipeline.Type pipelineType;
+ boolean isManagementPipeline = false;
if (dataProvider instanceof ResourceControllerDataProvider) {
pipelines = _registry.getPipelinesForEvent(event.getEventType());
- pipelineType = Pipeline.Type.DEFAULT;
} else if (dataProvider instanceof WorkflowControllerDataProvider) {
pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
isTaskFrameworkPipeline = true;
- pipelineType = Pipeline.Type.TASK;
} else if (dataProvider instanceof ManagementControllerDataProvider) {
pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
- pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+ isManagementPipeline = true;
} else {
logger.warn(String
.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
@@ -856,11 +845,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
}
// Should not run management mode and default/task pipelines at the same time.
- if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
- || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+ if (_inManagementMode != isManagementPipeline) {
logger.info("Should not run management mode and default/task pipelines at the same time. "
- + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
- manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
+ + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
+ manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
return;
}
@@ -880,6 +868,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
}
if (_isMonitoring) {
+ _clusterStatusMonitor.setEnabled(!_inManagementMode);
+ _clusterStatusMonitor.setPaused(_inManagementMode);
event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
}
}
@@ -1334,25 +1324,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
}
if (controllerIsLeader) {
- HelixManager manager = changeContext.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
- MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
- boolean prevPaused = _paused;
- boolean prevInMaintenanceMode = _inMaintenanceMode;
- _paused = updateControllerState(pauseSignal, _paused);
- _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
- // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
- if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
- pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
- }
-
enableClusterStatusMonitor(true);
- _clusterStatusMonitor.setEnabled(!_paused);
- _clusterStatusMonitor.setPaused(_paused);
- _clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
+ pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
} else {
enableClusterStatusMonitor(false);
// Note that onControllerChange is executed in parallel with the event processing thread. It
@@ -1542,43 +1515,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
}
}
- private boolean updateControllerState(PauseSignal signal, boolean statusFlag) {
- if (signal != null) {
- if (!statusFlag) {
- statusFlag = true;
- // This log is recorded for the first time entering PAUSE/MAINTENANCE mode
- logger.info(String.format("controller is now %s",
- (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
- }
- } else {
- statusFlag = false;
- }
- return statusFlag;
- }
-
- /**
- * Trigger a Resume Event if the cluster is back to activated.
- * @param changeContext
- * @param prevPaused the previous paused status.
- * @param prevInMaintenanceMode the previous in maintenance mode status.
- */
- private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
- boolean prevInMaintenanceMode) {
- /**
- * WARNING: the logic here is tricky.
- * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still
- * paused, the resume event should not be sent.
- * 2. Only send resume event if the status is changed back to active. So we don't send multiple
- * event unnecessarily.
- */
- if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
- pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
- logger.info("controller is now resumed from paused/maintenance state");
- return true;
- }
- return false;
- }
-
// TODO: refactor this to use common/ClusterEventProcessor.
@Deprecated
private class ClusterEventProcessor extends Thread {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6ce25e2..3c705f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -57,6 +57,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
@@ -87,6 +88,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private boolean _updateInstanceOfflineTime = true;
private MaintenanceSignal _maintenanceSignal;
+ private PauseSignal _pauseSignal;
private boolean _isMaintenanceModeEnabled;
private boolean _hasMaintenanceSignalChanged;
private ExecutorService _asyncTasksThreadPool;
@@ -300,8 +302,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
}
- private void updateMaintenanceInfo(final HelixDataAccessor accessor) {
+ private void refreshManagementSignals(final HelixDataAccessor accessor) {
_maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance());
+ _pauseSignal = accessor.getProperty(accessor.keyBuilder().pause());
_isMaintenanceModeEnabled = _maintenanceSignal != null;
// The following flag is to guarantee that there's only one update per pineline run because we
// check for whether maintenance recovery could happen twice every pipeline
@@ -373,7 +376,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
refreshResourceConfig(accessor, refreshedTypes);
_stateModelDefinitionCache.refresh(accessor);
_clusterConstraintsCache.refresh(accessor);
- updateMaintenanceInfo(accessor);
+ refreshManagementSignals(accessor);
timeoutNodesDuringMaintenance(accessor, _clusterConfig, _isMaintenanceModeEnabled);
// TODO: once controller gets split, only one controller should update offline instance history
@@ -622,6 +625,16 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
/**
+ * Gets all messages for each instance.
+ *
+ * @return Map of {instanceName -> Collection of Message}.
+ */
+ public Map<String, Collection<Message>> getAllInstancesMessages() {
+ return getAllInstances().stream().collect(
+ Collectors.toMap(instance -> instance, instance -> getMessages(instance).values()));
+ }
+
+ /**
* This function is supposed to be only used by testing purpose for safety. For "get" usage,
* please use getStaleMessagesByInstance.
*/
@@ -968,6 +981,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
return _maintenanceSignal;
}
+ public PauseSignal getPauseSignal() {
+ return _pauseSignal;
+ }
+
protected StringBuilder genCacheContentStringBuilder() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index d178ca5..fe940ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -19,9 +19,27 @@ package org.apache.helix.controller.dataproviders;
* under the License.
*/
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+
+/**
+ * Data provider for controller management mode pipeline.
+ */
public class ManagementControllerDataProvider extends BaseControllerDataProvider {
- // TODO: implement this class to only refresh required event types
- public ManagementControllerDataProvider(String clusterName, String name) {
- super(clusterName, name);
+ // Only these types of properties are refreshed for the full refresh request.
+ private static final List<HelixConstants.ChangeType> FULL_REFRESH_PROPERTIES =
+ Arrays.asList(HelixConstants.ChangeType.LIVE_INSTANCE, HelixConstants.ChangeType.MESSAGE);
+
+ public ManagementControllerDataProvider(String clusterName, String pipelineName) {
+ super(clusterName, pipelineName);
+ }
+
+ @Override
+ public void requireFullRefresh() {
+ for (HelixConstants.ChangeType type : FULL_REFRESH_PROPERTIES) {
+ _propertyDataChangedMap.get(type).set(true);
+ }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 512224d..042aa14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
@@ -36,13 +36,21 @@ public class ManagementModeStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
// TODO: implement the stage
+ _eventId = event.getEventId();
String clusterName = event.getClusterName();
ManagementControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
- if (!HelixUtil.inManagementMode(cache)) {
- LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+
+ // TODO: move to the last stage of management pipeline
+ checkInManagementMode(clusterName, cache);
+ }
+
+ private void checkInManagementMode(String clusterName, ManagementControllerDataProvider cache) {
+ // Should exit management mode
+ if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+ cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+ LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName);
RebalanceUtil.enableManagementMode(clusterName, false);
- throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index a4d4783..613ce2e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
@@ -44,12 +45,7 @@ public class ResourceValidationStage extends AbstractBaseStage {
throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
}
- // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
- if (HelixUtil.inManagementMode(cache)) {
- // Trigger an immediate management mode pipeline.
- RebalanceUtil.enableManagementMode(event.getClusterName(), true);
- throw new StageException("Pipeline should not be run because cluster is in management mode");
- }
+ processManagementMode(event, cache);
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (resourceMap == null) {
@@ -91,6 +87,27 @@ public class ResourceValidationStage extends AbstractBaseStage {
}
}
+ private void processManagementMode(ClusterEvent event, BaseControllerDataProvider cache)
+ throws StageException {
+ // Set cluster status monitor for maintenance mode
+ ClusterStatusMonitor monitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ if (monitor != null) {
+ monitor.setMaintenance(cache.isMaintenanceModeEnabled());
+ }
+
+ // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+ if (HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+ cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+ // Trigger an immediate management mode pipeline.
+ LogUtil.logInfo(LOG, _eventId,
+ "Enabling management mode pipeline for cluster " + event.getClusterName());
+ RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+ throw new StageException(
+ "Pipeline should not be run because cluster " + event.getClusterName()
+ + "is in management mode");
+ }
+ }
+
/**
* Check if the ideal state adheres to a rule
* @param idealState the ideal state to check
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b509506..6222df5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -59,6 +59,7 @@ public class Message extends HelixProperty {
NO_OP,
PARTICIPANT_ERROR_REPORT,
PARTICIPANT_SESSION_CHANGE,
+ PARTICIPANT_STATUS_CHANGE,
CHAINED_MESSAGE, // this is a message subtype
RELAYED_MESSAGE
}
@@ -927,6 +928,10 @@ public class Message extends HelixProperty {
return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
}
+ public boolean isParticipantStatusChangeType() {
+ return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType());
+ }
+
/**
* Get the {@link PropertyKey} for this message
* @param keyBuilder PropertyKey Builder
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index f916583..083daa3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -415,7 +415,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
*/
private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache, Set<String> resources)
throws Exception {
- ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
+ ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.StateVerifier);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
RebalanceUtil.runStage(event, new ResourceComputationStage());
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 60c3aaa..f0ec1b4 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -21,6 +21,7 @@ package org.apache.helix.util;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,7 +39,6 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyType;
import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -59,6 +59,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -519,13 +520,33 @@ public final class HelixUtil {
}
/**
- * Checks whether or not the cluster is in management mode.
+ * Checks whether or not the cluster is in management mode. It checks:
+ * - pause signal
+ * - live instances: whether any live instance is not in normal status, eg. frozen.
+ * - messages: whether live instance has a participant status change message
*
- * @param cache
- * @return
+ * @param pauseSignal pause signal
+ * @param liveInstanceMap map of live instances
+ * @param enabledLiveInstances set of enabled live instance names. They should be all included
+ * in the liveInstanceMap.
+ * @param instancesMessages a map of all instances' messages.
+ * @return true if cluster is in management mode; otherwise, false
*/
- public static boolean inManagementMode(BaseControllerDataProvider cache) {
- // TODO: implement the logic. Parameters can also change
- return true;
+ public static boolean inManagementMode(PauseSignal pauseSignal,
+ Map<String, LiveInstance> liveInstanceMap, Set<String> enabledLiveInstances,
+ Map<String, Collection<Message>> instancesMessages) {
+ // Check pause signal and abnormal live instances (eg. in freeze mode)
+ // TODO: should check maintenance signal when moving maintenance to management pipeline
+ return pauseSignal != null || enabledLiveInstances.stream().anyMatch(
+ instance -> isInstanceInManagementMode(instance, liveInstanceMap, instancesMessages));
+ }
+
+ private static boolean isInstanceInManagementMode(String instance,
+ Map<String, LiveInstance> liveInstanceMap,
+ Map<String, Collection<Message>> instancesMessages) {
+ // Check live instance status and participant status change message
+ return LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+ || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
+ .anyMatch(Message::isParticipantStatusChangeType));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index db2b76f..f74b98f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,8 +159,8 @@ public class RebalanceUtil {
enabled);
leaderController.setInManagementMode(enabled);
} else {
- LOG.error("Failed to switch management mode pipeline, enabled={}. "
- + "Controller for cluster {} does not exist", clusterName, enabled);
+ throw new HelixException(String.format("Failed to switch management mode pipeline, "
+ + "enabled=%s. Controller for cluster %s does not exist", enabled, clusterName));
}
// Triggers an event to immediately run the pipeline