You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/22 23:27:15 UTC

[helix] 06/06: Check cluster management mode status (#1798)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b8f2331e91a984c372e747b44aecd1cc2d23347c
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Tue Jun 22 11:22:49 2021 -0700

    Check cluster management mode status (#1798)
    
    Controller needs to know the participant freeze status so it can send freeze/unfreeze messages for entering/exiting freeze mode. The status check is done in management mode stage.
    
    This commit adds methods to check cluster management mode status, and update the status and history accordingly.
---
 .../main/java/org/apache/helix/PropertyKey.java    |   2 +-
 .../java/org/apache/helix/PropertyPathBuilder.java |   1 +
 .../pipeline/PipelineSwitchException.java          |  29 +++++
 .../controller/stages/ManagementModeStage.java     | 134 +++++++++++++++++++
 .../controller/stages/ResourceValidationStage.java |   6 +-
 .../java/org/apache/helix/model/ClusterStatus.java |   5 +
 .../org/apache/helix/model/ControllerHistory.java  |   8 +-
 .../controller/stages/TestManagementModeStage.java | 142 +++++++++++++++++++++
 8 files changed, 322 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 254cb95..eb5d5c1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -247,7 +247,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey clusterStatus() {
-      return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName);
+      return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName, _clusterName);
     }
 
     /**
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 34efd29..5c63304 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -89,6 +89,7 @@ public class PropertyPathBuilder {
     addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
     addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
     addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS");
+    addEntry(PropertyType.STATUS, 2, "/{clusterName}/STATUS/{clusterName}");
 
     addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
     addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
new file mode 100644
index 0000000..1584708
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
@@ -0,0 +1,29 @@
+package org.apache.helix.controller.pipeline;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Used to exit the current controller pipeline and switch to another pipeline.
+ */
+public class PipelineSwitchException extends StageException {
+  public PipelineSwitchException(String message) {
+    super(message);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 042aa14..94ff1d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,11 +19,31 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ControllerHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,9 +58,23 @@ public class ManagementModeStage extends AbstractBaseStage {
     // TODO: implement the stage
     _eventId = event.getEventId();
     String clusterName = event.getClusterName();
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    if (manager == null) {
+      throw new StageException("HelixManager attribute value is null");
+    }
+
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
 
+    ClusterManagementMode managementMode =
+        checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(),
+            cache.getAllInstancesMessages(), cache.getPauseSignal());
+
+    recordClusterStatus(managementMode, accessor);
+    recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(),
+        accessor);
+
     // TODO: move to the last stage of management pipeline
     checkInManagementMode(clusterName, cache);
   }
@@ -53,4 +87,104 @@ public class ManagementModeStage extends AbstractBaseStage {
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(
+      Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages,
+      PauseSignal pauseSignal) {
+    ClusterManagementMode.Type type;
+    ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+    if (pauseSignal == null) {
+      // TODO: Should check maintenance mode after it's moved to management pipeline.
+      type = ClusterManagementMode.Type.NORMAL;
+      if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+          allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else if (pauseSignal.isClusterPause()) {
+      type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+      if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else {
+      type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+    }
+
+    return new ClusterManagementMode(type, status);
+  }
+
+  private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages) {
+    // 1. All live instances are frozen
+    // 2. No pending participant status change message.
+    return enabledLiveInstances.stream().noneMatch(
+        instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+            || hasPendingMessage(allInstanceMessages.get(instance),
+            MessageType.PARTICIPANT_STATUS_CHANGE));
+  }
+
+  private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+    return messages != null && messages.stream()
+        .anyMatch(message -> type.name().equals(message.getMsgType()));
+  }
+
+  private void recordClusterStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+    // update cluster status
+    PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+    ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
+    if (clusterStatus == null) {
+      clusterStatus = new ClusterStatus();
+    }
+
+    ClusterManagementMode.Type recordedType = clusterStatus.getManagementMode();
+    ClusterManagementMode.Status recordedStatus = clusterStatus.getManagementModeStatus();
+
+    // If there is any pending message sent by users, status could be computed as in progress.
+    // Skip recording status change to avoid confusion after cluster is already fully frozen.
+    if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(recordedType)
+        && ClusterManagementMode.Status.COMPLETED.equals(recordedStatus)
+        && ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())
+        && ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())) {
+      LOG.info("Skip recording status mode={}, status={}, because cluster is fully frozen",
+          mode.getMode(), mode.getStatus());
+      return;
+    }
+
+    if (!mode.getMode().equals(recordedType) || !mode.getStatus().equals(recordedStatus)) {
+      // Only update status when it's different with metadata store
+      clusterStatus.setManagementMode(mode.getMode());
+      clusterStatus.setManagementModeStatus(mode.getStatus());
+      if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
+        LOG.error("Failed to update cluster status {}", clusterStatus);
+      }
+    }
+  }
+
+  private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
+      String controllerName, HelixDataAccessor accessor) {
+    // Only record completed status
+    if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
+      return;
+    }
+
+    // Record a management mode history in controller history
+    String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
+    long timestamp = Instant.now().toEpochMilli();
+    String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost());
+    String reason = (pauseSignal == null ? null : pauseSignal.getReason());
+
+    // Need the updater to avoid race condition with controller/maintenance history updates.
+    if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {
+      if (oldRecord == null) {
+        oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
+      }
+      return new ControllerHistory(oldRecord)
+          .updateManagementModeHistory(controllerName, mode, fromHost, timestamp, reason);
+    }, AccessOption.PERSISTENT)) {
+      LOG.error("Failed to write management mode history to ZK!");
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 613ce2e..696506a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.PipelineSwitchException;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
@@ -102,9 +103,10 @@ public class ResourceValidationStage extends AbstractBaseStage {
       LogUtil.logInfo(LOG, _eventId,
           "Enabling management mode pipeline for cluster " + event.getClusterName());
       RebalanceUtil.enableManagementMode(event.getClusterName(), true);
-      throw new StageException(
+      // TODO: redesign to terminate and switch pipeline more peacefully
+      throw new PipelineSwitchException(
           "Pipeline should not be run because cluster " + event.getClusterName()
-              + "is in management mode");
+              + " is in management mode");
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
index 6ed354c..a405fe3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Represents the cluster status. It can have fields for
@@ -32,6 +33,10 @@ public class ClusterStatus extends HelixProperty {
     super(PropertyType.STATUS.name());
   }
 
+  public ClusterStatus(ZNRecord record) {
+    super(record);
+  }
+
   public enum ClusterStatusProperty {
     MANAGEMENT_MODE,
     MANAGEMENT_MODE_STATUS
diff --git a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
index 4e418c3..47b0958 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
@@ -162,8 +162,12 @@ public class ControllerHistory extends HelixProperty {
     historyEntry.put(ConfigProperty.TIME.name(), Instant.ofEpochMilli(time).toString());
     historyEntry.put(ManagementModeConfigKey.MODE.name(), mode.getMode().name());
     historyEntry.put(ManagementModeConfigKey.STATUS.name(), mode.getStatus().name());
-    historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
-    historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+    if (fromHost != null) {
+      historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
+    }
+    if (reason != null) {
+      historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+    }
 
     return populateHistoryEntries(HistoryType.MANAGEMENT_MODE, historyEntry.toString());
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
new file mode 100644
index 0000000..28ca524
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -0,0 +1,142 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestManagementModeStage extends ZkTestBase {
+  HelixManager _manager;
+  HelixDataAccessor _accessor;
+  String _clusterName;
+
+  @BeforeClass
+  public void beforeClass() {
+    _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+    _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    _manager = new DummyClusterManager(_clusterName, _accessor);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    deleteLiveInstances(_clusterName);
+    deleteCluster(_clusterName);
+  }
+
+  @Test
+  public void testClusterFreezeStatus() throws Exception {
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1});
+    setupStateModel(_clusterName);
+
+    ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Unknown);
+    ManagementControllerDataProvider cache = new ManagementControllerDataProvider(_clusterName,
+        Pipeline.Type.MANAGEMENT_MODE.name());
+    event.addAttribute(AttributeName.helixmanager.name(), _manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Freeze cluster
+    ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+        .withReason("test")
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh, false);
+    ManagementModeStage managementModeStage = new ManagementModeStage();
+    managementModeStage.process(event);
+
+    // In frozen mode
+    ClusterStatus clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.CLUSTER_PAUSE);
+
+
+    // Mark a live instance to be pause state
+    LiveInstance liveInstance = liveInstances.get(0);
+    liveInstance.setStatus(LiveInstance.LiveInstanceStatus.PAUSED);
+    PropertyKey liveInstanceKey =
+        _accessor.keyBuilder().liveInstance(liveInstance.getInstanceName());
+    _accessor.updateProperty(liveInstanceKey, liveInstance);
+    // Require cache refresh
+    cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+
+    // Unfreeze cluster
+    request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.NORMAL)
+        .withReason("test")
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+    runPipeline(event, dataRefresh, false);
+    managementModeStage.process(event);
+    clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+    // In progress because a live instance is still frozen
+    Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+        ClusterManagementMode.Status.IN_PROGRESS);
+
+    // remove froze status to mark the live instance to be normal status
+    liveInstance = _accessor.getProperty(liveInstanceKey);
+    liveInstance.getRecord().getSimpleFields()
+        .remove(LiveInstance.LiveInstanceProperty.STATUS.name());
+    _accessor.setProperty(liveInstanceKey, liveInstance);
+    // Require cache refresh
+    cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+    runPipeline(event, dataRefresh, false);
+    try {
+      managementModeStage.process(event);
+    } catch (HelixException expected) {
+      // It's expected because controller does not set for cluster.
+      Assert.assertTrue(expected.getMessage()
+          .startsWith("Failed to switch management mode pipeline, enabled=false"));
+    }
+    clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+    // Fully existed frozen mode
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+    Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+        ClusterManagementMode.Status.COMPLETED);
+  }
+}