You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/22 18:22:59 UTC
[helix] branch cluster-pause-mode updated: Check cluster management
mode status (#1798)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/cluster-pause-mode by this push:
new c92e34d Check cluster management mode status (#1798)
c92e34d is described below
commit c92e34dfb511c10645d7dcddaa29d41b1884c53e
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Tue Jun 22 11:22:49 2021 -0700
Check cluster management mode status (#1798)
Controller needs to know the participant freeze status so it can send freeze/unfreeze messages for entering/exiting freeze mode. The status check is done in management mode stage.
This commit adds methods to check cluster management mode status, and update the status and history accordingly.
---
.../main/java/org/apache/helix/PropertyKey.java | 2 +-
.../java/org/apache/helix/PropertyPathBuilder.java | 1 +
.../pipeline/PipelineSwitchException.java | 29 +++++
.../controller/stages/ManagementModeStage.java | 134 +++++++++++++++++++
.../controller/stages/ResourceValidationStage.java | 6 +-
.../java/org/apache/helix/model/ClusterStatus.java | 5 +
.../org/apache/helix/model/ControllerHistory.java | 8 +-
.../controller/stages/TestManagementModeStage.java | 142 +++++++++++++++++++++
8 files changed, 322 insertions(+), 5 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index e6fe2e9..8b7eb33 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -247,7 +247,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey clusterStatus() {
- return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName);
+ return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName, _clusterName);
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 5e178a1..f5e13ab 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -105,6 +105,7 @@ public class PropertyPathBuilder {
addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS");
+ addEntry(PropertyType.STATUS, 2, "/{clusterName}/STATUS/{clusterName}");
addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
new file mode 100644
index 0000000..1584708
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
@@ -0,0 +1,29 @@
+package org.apache.helix.controller.pipeline;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Used to exit the current controller pipeline and switch to another pipeline.
+ */
+public class PipelineSwitchException extends StageException {
+ public PipelineSwitchException(String message) {
+ super(message);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 042aa14..94ff1d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,11 +19,31 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ControllerHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.PauseSignal;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +58,23 @@ public class ManagementModeStage extends AbstractBaseStage {
// TODO: implement the stage
_eventId = event.getEventId();
String clusterName = event.getClusterName();
+ HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+ if (manager == null) {
+ throw new StageException("HelixManager attribute value is null");
+ }
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
ManagementControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ ClusterManagementMode managementMode =
+ checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(),
+ cache.getAllInstancesMessages(), cache.getPauseSignal());
+
+ recordClusterStatus(managementMode, accessor);
+ recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(),
+ accessor);
+
// TODO: move to the last stage of management pipeline
checkInManagementMode(clusterName, cache);
}
@@ -53,4 +87,104 @@ public class ManagementModeStage extends AbstractBaseStage {
RebalanceUtil.enableManagementMode(clusterName, false);
}
}
+
+ // Checks cluster freeze, controller pause mode and status.
+ private ClusterManagementMode checkClusterFreezeStatus(
+ Set<String> enabledLiveInstances,
+ Map<String, LiveInstance> liveInstanceMap,
+ Map<String, Collection<Message>> allInstanceMessages,
+ PauseSignal pauseSignal) {
+ ClusterManagementMode.Type type;
+ ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+ if (pauseSignal == null) {
+ // TODO: Should check maintenance mode after it's moved to management pipeline.
+ type = ClusterManagementMode.Type.NORMAL;
+ if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+ allInstanceMessages)) {
+ status = ClusterManagementMode.Status.IN_PROGRESS;
+ }
+ } else if (pauseSignal.isClusterPause()) {
+ type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+ if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+ status = ClusterManagementMode.Status.IN_PROGRESS;
+ }
+ } else {
+ type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+ }
+
+ return new ClusterManagementMode(type, status);
+ }
+
+ private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+ Map<String, LiveInstance> liveInstanceMap,
+ Map<String, Collection<Message>> allInstanceMessages) {
+ // 1. All live instances are frozen
+ // 2. No pending participant status change message.
+ return enabledLiveInstances.stream().noneMatch(
+ instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+ || hasPendingMessage(allInstanceMessages.get(instance),
+ MessageType.PARTICIPANT_STATUS_CHANGE));
+ }
+
+ private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+ return messages != null && messages.stream()
+ .anyMatch(message -> type.name().equals(message.getMsgType()));
+ }
+
+ private void recordClusterStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+ // update cluster status
+ PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+ ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
+ if (clusterStatus == null) {
+ clusterStatus = new ClusterStatus();
+ }
+
+ ClusterManagementMode.Type recordedType = clusterStatus.getManagementMode();
+ ClusterManagementMode.Status recordedStatus = clusterStatus.getManagementModeStatus();
+
+ // If there is any pending message sent by users, status could be computed as in progress.
+ // Skip recording status change to avoid confusion after cluster is already fully frozen.
+ if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(recordedType)
+ && ClusterManagementMode.Status.COMPLETED.equals(recordedStatus)
+ && ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())
+ && ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())) {
+ LOG.info("Skip recording status mode={}, status={}, because cluster is fully frozen",
+ mode.getMode(), mode.getStatus());
+ return;
+ }
+
+ if (!mode.getMode().equals(recordedType) || !mode.getStatus().equals(recordedStatus)) {
+ // Only update status when it's different with metadata store
+ clusterStatus.setManagementMode(mode.getMode());
+ clusterStatus.setManagementModeStatus(mode.getStatus());
+ if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
+ LOG.error("Failed to update cluster status {}", clusterStatus);
+ }
+ }
+ }
+
+ private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
+ String controllerName, HelixDataAccessor accessor) {
+ // Only record completed status
+ if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
+ return;
+ }
+
+ // Record a management mode history in controller history
+ String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
+ long timestamp = Instant.now().toEpochMilli();
+ String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost());
+ String reason = (pauseSignal == null ? null : pauseSignal.getReason());
+
+ // Need the updater to avoid race condition with controller/maintenance history updates.
+ if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {
+ if (oldRecord == null) {
+ oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
+ }
+ return new ControllerHistory(oldRecord)
+ .updateManagementModeHistory(controllerName, mode, fromHost, timestamp, reason);
+ }, AccessOption.PERSISTENT)) {
+ LOG.error("Failed to write management mode history to ZK!");
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 613ce2e..696506a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.PipelineSwitchException;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
@@ -102,9 +103,10 @@ public class ResourceValidationStage extends AbstractBaseStage {
LogUtil.logInfo(LOG, _eventId,
"Enabling management mode pipeline for cluster " + event.getClusterName());
RebalanceUtil.enableManagementMode(event.getClusterName(), true);
- throw new StageException(
+ // TODO: redesign to terminate and switch pipeline more peacefully
+ throw new PipelineSwitchException(
"Pipeline should not be run because cluster " + event.getClusterName()
- + "is in management mode");
+ + " is in management mode");
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
index 6ed354c..a405fe3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyType;
import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
* Represents the cluster status. It can have fields for
@@ -32,6 +33,10 @@ public class ClusterStatus extends HelixProperty {
super(PropertyType.STATUS.name());
}
+ public ClusterStatus(ZNRecord record) {
+ super(record);
+ }
+
public enum ClusterStatusProperty {
MANAGEMENT_MODE,
MANAGEMENT_MODE_STATUS
diff --git a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
index 4e418c3..47b0958 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
@@ -162,8 +162,12 @@ public class ControllerHistory extends HelixProperty {
historyEntry.put(ConfigProperty.TIME.name(), Instant.ofEpochMilli(time).toString());
historyEntry.put(ManagementModeConfigKey.MODE.name(), mode.getMode().name());
historyEntry.put(ManagementModeConfigKey.STATUS.name(), mode.getStatus().name());
- historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
- historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+ if (fromHost != null) {
+ historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
+ }
+ if (reason != null) {
+ historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+ }
return populateHistoryEntries(HistoryType.MANAGEMENT_MODE, historyEntry.toString());
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
new file mode 100644
index 0000000..28ca524
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -0,0 +1,142 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestManagementModeStage extends ZkTestBase {
+ HelixManager _manager;
+ HelixDataAccessor _accessor;
+ String _clusterName;
+
+ @BeforeClass
+ public void beforeClass() {
+ _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+ _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+ _manager = new DummyClusterManager(_clusterName, _accessor);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ deleteLiveInstances(_clusterName);
+ deleteCluster(_clusterName);
+ }
+
+ @Test
+ public void testClusterFreezeStatus() throws Exception {
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+ List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1});
+ setupStateModel(_clusterName);
+
+ ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Unknown);
+ ManagementControllerDataProvider cache = new ManagementControllerDataProvider(_clusterName,
+ Pipeline.Type.MANAGEMENT_MODE.name());
+ event.addAttribute(AttributeName.helixmanager.name(), _manager);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+ // Freeze cluster
+ ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+ .withClusterName(_clusterName)
+ .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+ .withReason("test")
+ .build();
+ _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+ runPipeline(event, dataRefresh, false);
+ ManagementModeStage managementModeStage = new ManagementModeStage();
+ managementModeStage.process(event);
+
+ // In frozen mode
+ ClusterStatus clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+ Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.CLUSTER_PAUSE);
+
+
+ // Mark a live instance to be pause state
+ LiveInstance liveInstance = liveInstances.get(0);
+ liveInstance.setStatus(LiveInstance.LiveInstanceStatus.PAUSED);
+ PropertyKey liveInstanceKey =
+ _accessor.keyBuilder().liveInstance(liveInstance.getInstanceName());
+ _accessor.updateProperty(liveInstanceKey, liveInstance);
+ // Require cache refresh
+ cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+
+ // Unfreeze cluster
+ request = ClusterManagementModeRequest.newBuilder()
+ .withClusterName(_clusterName)
+ .withMode(ClusterManagementMode.Type.NORMAL)
+ .withReason("test")
+ .build();
+ _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+ runPipeline(event, dataRefresh, false);
+ managementModeStage.process(event);
+ clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+ Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+ // In progress because a live instance is still frozen
+ Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+ ClusterManagementMode.Status.IN_PROGRESS);
+
+ // remove froze status to mark the live instance to be normal status
+ liveInstance = _accessor.getProperty(liveInstanceKey);
+ liveInstance.getRecord().getSimpleFields()
+ .remove(LiveInstance.LiveInstanceProperty.STATUS.name());
+ _accessor.setProperty(liveInstanceKey, liveInstance);
+ // Require cache refresh
+ cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+ runPipeline(event, dataRefresh, false);
+ try {
+ managementModeStage.process(event);
+ } catch (HelixException expected) {
+ // It's expected because controller does not set for cluster.
+ Assert.assertTrue(expected.getMessage()
+ .startsWith("Failed to switch management mode pipeline, enabled=false"));
+ }
+ clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+ // Fully existed frozen mode
+ Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+ Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+ ClusterManagementMode.Status.COMPLETED);
+ }
+}