You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:10:47 UTC

[GitHub] [helix] huizhilu commented on a change in pull request #1798: Check cluster management mode status

huizhilu commented on a change in pull request #1798:
URL: https://github.com/apache/helix/pull/1798#discussion_r653943978



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
##########
@@ -53,4 +88,87 @@ private void checkInManagementMode(String clusterName, ManagementControllerDataP
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(
+      Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages,
+      PauseSignal pauseSignal) {
+    ClusterManagementMode.Type type;
+    ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+    if (pauseSignal == null) {
+      // TODO: Should check maintenance mode after it's moved to management pipeline.
+      type = ClusterManagementMode.Type.NORMAL;
+      if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+          allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else if (pauseSignal.isClusterPause()) {
+      type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+      if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else {
+      type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+    }
+
+    return new ClusterManagementMode(type, status);
+  }
+
+  private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages) {
+    // 1. All live instances are frozen
+    // 2. No pending participant status change message.
+    return enabledLiveInstances.stream().noneMatch(
+        instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+            || hasPendingMessage(
+            allInstanceMessages.getOrDefault(instance, Collections.emptyList()),
+            MessageType.PARTICIPANT_STATUS_CHANGE));
+  }
+
+  private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+    return messages.stream().anyMatch(message -> type.name().equals(message.getMsgType()));
+  }
+
+  private void recordManagementModeStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+    // update cluster status
+    PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+    ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
+    if (clusterStatus == null) {
+      clusterStatus = new ClusterStatus();
+    }
+    if (!mode.getMode().equals(clusterStatus.getManagementMode())
+        || !mode.getStatus().equals(clusterStatus.getManagementModeStatus())) {
+      // status is different, need to write to metastore
+      clusterStatus.setManagementMode(mode.getMode());
+      clusterStatus.setManagementModeStatus(mode.getStatus());
+      if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
+        LOG.error("Failed to update cluster status {}", clusterStatus);
+      }
+    }
+  }
+
+  private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
+      String controllerName, HelixDataAccessor accessor) {
+    // Only record completed status
+    if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
+      return;
+    }
+
+    // Record a management mode history in controller history
+    String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
+    if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {
+      if (oldRecord == null) {
+        oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
+      }
+      String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost());
+      String reason = (pauseSignal == null ? null : pauseSignal.getReason());
+      return new ControllerHistory(oldRecord).updateManagementModeHistory(controllerName, mode,

Review comment:
       The old record object is a `ZNRecord` type (defined by the ZkBaseDataAccessor), which doesn't have the method `updateManagementModeHistory()`.

##########
File path: helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
##########
@@ -105,6 +105,9 @@
     addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
     addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
     addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS");
+    addEntry(PropertyType.STATUS, 2, "/{clusterName}/STATUS/{scope}");
+    addEntry(PropertyType.STATUS, 3, "/{clusterName}/STATUS/{scope}/{scopeKey}");

Review comment:
       This is considered based on extensibility and design alignment. It aligns with the other helix paths design, such as`CONFIGS` path: `/{clusterName}/CONFIGS/CLUSTER/{clusterName}`, `/{clusterName}/CONFIGS/PARTICIPANT/{clusterName}`.
   In the future, if we have other use cases, like cloud/participant/task, it's easier to extend.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
##########
@@ -38,9 +59,23 @@ public void process(ClusterEvent event) throws Exception {
     // TODO: implement the stage
     _eventId = event.getEventId();
     String clusterName = event.getClusterName();
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    if (manager == null) {
+      throw new StageException("HelixManager attribute value is null");

Review comment:
       It's planned to have a different exception, like `PipelineSwitchException` extends the StageException.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
##########
@@ -53,4 +88,87 @@ private void checkInManagementMode(String clusterName, ManagementControllerDataP
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(

Review comment:
       We only pause enabled live instances. If a disabled instance becomes enabled and live, it'll receive pause messages from controller to be paused. 

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
##########
@@ -53,4 +88,87 @@ private void checkInManagementMode(String clusterName, ManagementControllerDataP
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(
+      Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages,
+      PauseSignal pauseSignal) {
+    ClusterManagementMode.Type type;
+    ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+    if (pauseSignal == null) {
+      // TODO: Should check maintenance mode after it's moved to management pipeline.
+      type = ClusterManagementMode.Type.NORMAL;
+      if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+          allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else if (pauseSignal.isClusterPause()) {
+      type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+      if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else {
+      type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+    }
+
+    return new ClusterManagementMode(type, status);
+  }
+
+  private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages) {
+    // 1. All live instances are frozen
+    // 2. No pending participant status change message.
+    return enabledLiveInstances.stream().noneMatch(
+        instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+            || hasPendingMessage(
+            allInstanceMessages.getOrDefault(instance, Collections.emptyList()),
+            MessageType.PARTICIPANT_STATUS_CHANGE));
+  }
+
+  private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+    return messages.stream().anyMatch(message -> type.name().equals(message.getMsgType()));
+  }
+
+  private void recordManagementModeStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+    // update cluster status
+    PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+    ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
+    if (clusterStatus == null) {
+      clusterStatus = new ClusterStatus();
+    }
+    if (!mode.getMode().equals(clusterStatus.getManagementMode())
+        || !mode.getStatus().equals(clusterStatus.getManagementModeStatus())) {
+      // status is different, need to write to metastore
+      clusterStatus.setManagementMode(mode.getMode());
+      clusterStatus.setManagementModeStatus(mode.getStatus());
+      if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
+        LOG.error("Failed to update cluster status {}", clusterStatus);
+      }
+    }
+  }
+
+  private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
+      String controllerName, HelixDataAccessor accessor) {
+    // Only record completed status
+    if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
+      return;
+    }
+
+    // Record a management mode history in controller history
+    String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
+    if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {

Review comment:
       We need the updater. Because the HISTORY znode can be updated by multi threads: controller/maintenance/management mode update threads. They are using the same history znode. We need the version check to avoid race condition and make sure the write is correct
   

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
##########
@@ -53,4 +88,87 @@ private void checkInManagementMode(String clusterName, ManagementControllerDataP
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(
+      Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages,
+      PauseSignal pauseSignal) {
+    ClusterManagementMode.Type type;
+    ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+    if (pauseSignal == null) {
+      // TODO: Should check maintenance mode after it's moved to management pipeline.
+      type = ClusterManagementMode.Type.NORMAL;
+      if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+          allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else if (pauseSignal.isClusterPause()) {
+      type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+      if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else {
+      type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+    }
+
+    return new ClusterManagementMode(type, status);
+  }
+
+  private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages) {
+    // 1. All live instances are frozen
+    // 2. No pending participant status change message.
+    return enabledLiveInstances.stream().noneMatch(
+        instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+            || hasPendingMessage(
+            allInstanceMessages.getOrDefault(instance, Collections.emptyList()),
+            MessageType.PARTICIPANT_STATUS_CHANGE));
+  }
+
+  private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+    return messages.stream().anyMatch(message -> type.name().equals(message.getMsgType()));
+  }
+
+  private void recordManagementModeStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+    // update cluster status
+    PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+    ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);

Review comment:
       For HISTORY, I don't see it's updated like that - flushing cache to ZK. The history is always updated using a DataAccessor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org