You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/22 23:27:09 UTC

[helix] branch cluster-pause-mode updated (c92e34d -> b8f2331)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a change to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git.


    omit c92e34d  Check cluster management mode status (#1798)
    omit 7f9fe63  Add message util to create messages (#1796)
    omit 57180c4  Move pause and maintenance handling out of controller (#1793)
    omit 8b5ca30  Add management mode pipeline registry and switch logic (#1769)
    omit 0a4f23a  Add model to record history and status of management mode (#1771)
    omit f980f46  Add java api for enable/disable cluster pause mode (#1740)
     add 481d548  Fix the unstable test TestWagedNodeSwap. (#1738)
     add c68b029  Properly clean up TaskCurrentState sessions (#1736)
     add b12e025  Use 1 min sliding window size for Helix rest metrics (#1749)
     add faa0cd3  Remove a potential deadlock when shutting down a RoutingTableProvider. (#1751)
     add 9b0d085  Automatically create issues for the failed tests. (#1757)
     add 4c1c8cc  Add 1.0.2 release note and web
     add 1a746c5  Add 1.0.2 in pom file
     add 3a494a4  Add concurrent control to DistClusterControllerStateModel._controller access to avoid NPE. (#1753)
     add bab58b2  Change instance status in Helix UI to be colorblind-friendly (#1759)
     add 3d0f443  [maven-release-plugin] prepare release helix-1.0.2
     add 1c6ad50  [maven-release-plugin] prepare for next development iteration
     add 4573b3b  Revert "[maven-release-plugin] prepare for next development iteration"
     add 97b43be  Revert "[maven-release-plugin] prepare release helix-1.0.2"
     add 10211db  [maven-release-plugin] prepare release helix-1.0.2
     add a0b648b  Revert "[maven-release-plugin] prepare release helix-1.0.2"
     add 2e67f74  [maven-release-plugin] prepare for next development iteration
     add fd06a68  Update Helix-CI.yml
     add 94f3f81  Revert "[maven-release-plugin] prepare for next development iteration"
     add 9214bb8  [maven-release-plugin] prepare release helix-1.0.2
     add 70f8afd  [maven-release-plugin] prepare for next development iteration
     add 7c60bed  Update Helix-CI.yml
     add bbb0554  Remove task requested state (#1723)
     add c04f355  [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
     add 35dd2e7  Change throttling logic to per message (#1714)
     add c819bf7  Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage #1719
     add 3a737ef  Applying per replica logic for entire stage #1724
     add f49986e  [Replica Level Throttle] Make Pipeline in a correct order and fixes tests (#1750)
     add fbd901a  Add test cases for replica level throttling (#1754)
     add 17c52da  Refactor/clean up code without logic change (#1760)
     add c93a7e3  Add a Waged rebalancer util api that do not need raw zk address (#1756)
     add 7f05db2  Remove unused field in ZKHelixDataAccessor - code clean with no logic change (#1764)
     add 1a37d5c  Get rid of non-official ZK error code to avoid NPEs. (#1778)
     add 16117fe  Gracefully handle interruptions in the Helix logic. (#1779)
     add a3b6a4d  Enforce id field in Helix rest to update resource config (#1672)
     add 294fb23  Add cloud set/remove to cluster setup (#1783)
     add fa83198  New REST api partitionAssignment -- return potential assignment given cluster change (#1747)
     add 95f4ed3  Change partition status in Helix UI to be colorblind-friendly (#1785)
     add 7dccbd7  Update release note
     add ff44fa5  Update the version in website
     add cebc221  Fix import code style in PropertyPathBuilder. (#1790)
     add 146adff  Fix MaintenanceRecoveryStage Hanging (#1792)
     add 93efb48  Add a badge to track flaky tests. (#1791)
     add 8911f7e  Support currentState format for partitionAssignment (#1787)
     add d9e9dca  Add failure message for TestClusterAccessor  (#1794)
     new 5eec5a6  Add java api for enable/disable cluster pause mode (#1740)
     new 7c1baee  Add model to record history and status of management mode (#1771)
     new 82513fa  Add management mode pipeline registry and switch logic (#1769)
     new a6d8d19  Move pause and maintenance handling out of controller (#1793)
     new 25117d9  Add message util to create messages (#1796)
     new b8f2331  Check cluster management mode status (#1798)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c92e34d)
            \
             N -- N -- N   refs/heads/cluster-pause-mode (b8f2331)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/scripts/printTestResult.sh                 |   1 +
 .github/workflows/Helix-CI.yml                     |  92 ++-
 README.md                                          |   1 +
 helix-admin-webapp/pom.xml                         |   2 +-
 helix-agent/pom.xml                                |   2 +-
 helix-common/pom.xml                               |   2 +-
 helix-core/helix-core-1.0.2-SNAPSHOT.ivy           |   2 +-
 helix-core/pom.xml                                 |   2 +-
 .../main/java/org/apache/helix/PropertyKey.java    |  10 +
 .../java/org/apache/helix/PropertyPathBuilder.java |  50 +-
 .../helix/controller/GenericHelixController.java   |  15 +-
 .../helix/controller/common/PartitionStateMap.java |   9 +-
 .../pipeline/AbstractAsyncBaseStage.java           |  24 +-
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  18 +-
 .../stages/IntermediateStateCalcStage.java         | 785 +++++++++------------
 .../controller/stages/MessageGenerationPhase.java  |  14 +-
 .../helix/controller/stages/MessageOutput.java     |   4 +
 .../resource/ResourceMessageGenerationPhase.java   |  37 -
 .../stages/task/TaskMessageGenerationPhase.java    |  38 -
 .../helix/manager/zk/CallbackEventExecutor.java    |   2 +-
 .../helix/manager/zk/ParticipantManager.java       |  17 +-
 .../helix/manager/zk/ZKHelixDataAccessor.java      |  13 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |   2 +-
 .../helix/manager/zk/ZkBucketDataAccessor.java     |  42 +-
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |   5 +-
 .../handling/HelixStateTransitionHandler.java      | 150 ++--
 .../messaging/handling/HelixTaskExecutor.java      |   1 -
 .../helix/monitoring/mbeans/ResourceMonitor.java   |  58 +-
 .../DistClusterControllerStateModel.java           |  47 +-
 .../helix/participant/statemachine/StateModel.java |   4 +-
 .../helix/spectator/RoutingTableProvider.java      |  71 +-
 .../java/org/apache/helix/task/TaskRunner.java     |  61 +-
 .../java/org/apache/helix/task/TaskStateModel.java |   2 +-
 .../java/org/apache/helix/tools/ClusterSetup.java  |  65 ++
 .../BestPossibleExternalViewVerifier.java          |   3 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |  53 +-
 .../java/org/apache/helix/common/ZkTestBase.java   |   9 +-
 .../helix/controller/stages/BaseStageTest.java     |   9 +
 .../controller/stages/TestAsyncBaseStage.java      |  82 +++
 .../stages/TestCancellationMessageGeneration.java  |  14 +-
 .../stages/TestIntermediateStateCalcStage.java     |  60 +-
 .../controller/stages/TestRebalancePipeline.java   |  27 +-
 .../controller/stages/TestRecoveryLoadBalance.java | 225 ------
 .../stages/TestReplicaLevelThrottling.java         | 263 +++++++
 .../stages/TestStateTransitionPriority.java        |  95 ++-
 .../TestNoThrottleDisabledPartitions.java          |  38 +-
 .../integration/TestZkCallbackHandlerLeak.java     |  10 +-
 .../controller/TestClusterMaintenanceMode.java     |  55 +-
 .../controller/TestRedundantDroppedMessage.java    |  11 +-
 .../TestCrushAutoRebalanceNonRack.java             |   3 +-
 .../WagedRebalancer/TestWagedNodeSwap.java         |  24 +-
 .../helix/integration/task/TestStopWorkflow.java   |   2 +-
 .../apache/helix/manager/zk/TestHandleSession.java |  43 ++
 .../messaging/p2pMessage/TestP2PMessages.java      |   6 +-
 .../TestP2PMessagesAvoidDuplicatedMessage.java     |  17 +-
 .../p2pMessage/TestP2PStateTransitionMessages.java |  16 +-
 .../TestP2PWithStateCancellationMessage.java       |  16 +-
 .../apache/helix/mock/MockZkHelixDataAccessor.java |   2 +-
 .../TestClusterStatusMonitorLifecycle.java         |  13 +-
 .../monitoring/mbeans/TestRebalancerMetrics.java   |  18 +-
 .../org/apache/helix/tools/TestClusterSetup.java   |  30 +
 .../test/resources/TestPartitionLevelPriority.json |   2 +-
 .../TestRecoveryLoadBalance.MasterSlave.json       | 367 ----------
 .../TestRecoveryLoadBalance.OnlineOffline.json     | 206 ------
 .../TestReplicaLevelThrottling.MultiTopStates.json | 281 ++++++++
 .../TestReplicaLevelThrottling.SingleTopState.json | 280 ++++++++
 .../instance-list/instance-list.component.html     |   8 +-
 .../instance-list/instance-list.component.scss     |   4 +-
 .../partition-list/partition-list.component.html   |   3 +-
 .../partition-list/partition-list.component.scss   |   4 +-
 .../shared/state-label/state-label.component.scss  |   4 +-
 helix-lock/pom.xml                                 |   2 +-
 helix-rest/helix-rest-1.0.2-SNAPSHOT.ivy           |   4 +-
 helix-rest/pom.xml                                 |   4 +-
 .../apache/helix/rest/server/HelixRestServer.java  |  13 +-
 .../apache/helix/rest/server/ServerContext.java    |  22 +-
 .../resources/helix/AbstractHelixResource.java     |   5 +
 .../server/resources/helix/ClusterAccessor.java    |   2 +-
 .../server/resources/helix/ResourceAccessor.java   |   5 +
 .../helix/ResourceAssignmentOptimizerAccessor.java | 373 ++++++++++
 .../helix/rest/server/AbstractTestClass.java       |   9 +-
 .../helix/rest/server/TestClusterAccessor.java     |  18 +-
 .../helix/rest/server/TestResourceAccessor.java    |  23 +
 .../TestResourceAssignmentOptimizerAccessor.java   | 237 +++++++
 .../rest/server/service/TestInstanceService.java   |   5 +-
 metadata-store-directory-common/pom.xml            |   2 +-
 metrics-common/pom.xml                             |   2 +-
 pom.xml                                            |   8 +-
 recipes/distributed-lock-manager/pom.xml           |   2 +-
 recipes/pom.xml                                    |   2 +-
 recipes/rabbitmq-consumer-group/pom.xml            |   2 +-
 recipes/rsync-replicated-file-system/pom.xml       |   2 +-
 recipes/service-discovery/pom.xml                  |   2 +-
 recipes/task-execution/pom.xml                     |   2 +-
 website/0.6.1-incubating/pom.xml                   |   2 +-
 website/0.6.2-incubating/pom.xml                   |   2 +-
 website/0.6.3/pom.xml                              |   2 +-
 website/0.6.4/pom.xml                              |   2 +-
 website/0.6.5/pom.xml                              |   2 +-
 website/0.6.6/pom.xml                              |   2 +-
 website/0.6.7/pom.xml                              |   2 +-
 website/0.6.8/pom.xml                              |   2 +-
 website/0.6.9/pom.xml                              |   2 +-
 website/0.7.0-incubating/pom.xml                   |   2 +-
 website/0.7.1/pom.xml                              |   2 +-
 website/0.8.0/pom.xml                              |   2 +-
 website/0.8.1/pom.xml                              |   2 +-
 website/0.8.2/pom.xml                              |   2 +-
 website/0.8.3/pom.xml                              |   2 +-
 website/0.8.4/pom.xml                              |   2 +-
 website/0.9.0/pom.xml                              |   2 +-
 website/0.9.1/pom.xml                              |   2 +-
 website/0.9.4/pom.xml                              |   2 +-
 website/0.9.7/pom.xml                              |   2 +-
 website/0.9.8/pom.xml                              |   2 +-
 website/0.9.9/pom.xml                              |   2 +-
 website/1.0.0/pom.xml                              |   2 +-
 website/1.0.1/pom.xml                              |   2 +-
 website/{0.6.5 => 1.0.2}/pom.xml                   |   6 +-
 .../{ => 1.0.2}/src/site/apt/privacy-policy.apt    |   0
 .../src/site/apt/releasenotes/release-1.0.2.apt    | 101 +++
 .../{0.9.0 => 1.0.2}/src/site/markdown/Building.md |   4 +-
 .../{1.0.1 => 1.0.2}/src/site/markdown/Features.md |   0
 .../{1.0.1 => 1.0.2}/src/site/markdown/Metrics.md  |   0
 .../src/site/markdown/Quickstart.md                |   4 +-
 .../{1.0.1 => 1.0.2}/src/site/markdown/Tutorial.md |   0
 .../src/site/markdown/auto_exit_maintenance.md     |   0
 .../src/site/markdown/design_crushed.md            |   0
 .../{0.8.1 => 1.0.2}/src/site/markdown/index.md    |   6 +-
 .../src/site/markdown/quota_scheduling.md          |   0
 .../src/site/markdown/recipes/lock_manager.md      |   2 +-
 .../markdown/recipes/rabbitmq_consumer_group.md    |   2 +-
 .../recipes/rsync_replicated_file_store.md         |   2 +-
 .../src/site/markdown/recipes/service_discovery.md |   2 +-
 .../site/markdown/recipes/task_dag_execution.md    |   2 +-
 .../src/site/markdown/tutorial_admin.md            |   6 +-
 .../src/site/markdown/tutorial_agent.md            |   0
 .../src/site/markdown/tutorial_cloud_support.md    |   0
 .../src/site/markdown/tutorial_controller.md       |   0
 .../src/site/markdown/tutorial_customized_view.md  |   0
 .../src/site/markdown/tutorial_distributed_lock.md |   0
 .../src/site/markdown/tutorial_health.md           |   0
 .../src/site/markdown/tutorial_messaging.md        |   2 +-
 .../src/site/markdown/tutorial_participant.md      |   0
 .../src/site/markdown/tutorial_propstore.md        |   2 +-
 .../src/site/markdown/tutorial_rebalance.md        |   0
 .../src/site/markdown/tutorial_rest_service.md     |   4 +-
 .../src/site/markdown/tutorial_spectator.md        |   0
 .../src/site/markdown/tutorial_state.md            |   0
 .../src/site/markdown/tutorial_task_framework.md   |   0
 .../src/site/markdown/tutorial_task_throttling.md  |   0
 .../src/site/markdown/tutorial_throttling.md       |   0
 .../src/site/markdown/tutorial_ui.md               |   2 +-
 .../site/markdown/tutorial_user_content_store.md   |   0
 .../site/markdown/tutorial_user_def_rebalancer.md  |   0
 .../src/site/markdown/tutorial_yaml.md             |   0
 .../{1.0.1 => 1.0.2}/src/site/resources/.htaccess  |   0
 .../src/site/resources/JobExample.json             |   0
 .../src/site/resources/WorkflowExample.json        |   0
 .../resources/css/bootstrap-responsive.min.css     |   0
 .../src/site/resources/css/bootstrap.min.css       |   0
 .../{ => 1.0.2}/src/site/resources/download.cgi    |   0
 .../images/CustomizedViewSystemArchitecture.jpeg   | Bin
 .../images/HelixPriorityLockWorkflow.jpeg          | Bin
 .../src/site/resources/images/PFS-Generic.png      | Bin
 .../images/ParticipantAutoRegistrationLogic.png    | Bin
 .../src/site/resources/images/RSYNC_BASED_PFS.png  | Bin
 .../site/resources/images/TaskFrameworkLayers.png  | Bin
 .../src/site/resources/images/UIScreenshot.png     | Bin
 .../src/site/resources/images/UIScreenshot2.png    | Bin
 .../resources/images/auto-exit-maintenance.jpg     | Bin
 .../images/design/crushed/after-using-crushed.png  | Bin
 .../images/design/crushed/before-using-crush.png   | Bin
 .../resources/images/design/crushed/classes.png    | Bin
 .../images/design/crushed/crushed-master-dist.png  | Bin
 .../design/crushed/crushed-partition-dist.png      | Bin
 .../images/design/crushed/cursh-master-dist.png    | Bin
 .../images/design/crushed/cursh-partition-dist.png | Bin
 .../crushed/example-cluster-master-dist-after.png  | Bin
 .../design/crushed/example-cluster-master-dist.png | Bin
 .../crushed/example-cluster-partition-dist.png     | Bin
 .../crushed/example-movement-on-expansion.png      | Bin
 .../design/crushed/node-down-master-move.png       | Bin
 .../design/crushed/node-down-partition-move.png    | Bin
 .../images/design/crushed/performance.png          | Bin
 .../images/quota_InstanceCapacityManager.jpeg      | Bin
 .../src/site/resources/images/quota_intro.png      | Bin
 .../src/site/resources/js/bootstrap.min.js         |   0
 .../src/site/resources/js/jquery.min.js            |   0
 website/{1.0.0 => 1.0.2}/src/site/site.xml         |   8 +-
 .../{0.9.0 => 1.0.2}/src/site/xdoc/download.xml.vm |   2 +-
 website/{1.0.1 => 1.0.2}/src/test/conf/testng.xml  |   0
 website/pom.xml                                    |   3 +-
 zookeeper-api/pom.xml                              |   2 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  22 +-
 .../zkclient/callback/ZkAsyncCallbacks.java        |  14 +-
 .../zookeeper/impl/client/TestRawZkClient.java     |   4 +-
 .../impl/client/TestZkClientAsyncRetry.java        |  23 +-
 198 files changed, 2989 insertions(+), 1981 deletions(-)
 delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
 delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java
 delete mode 100644 helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
 delete mode 100644 helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
 delete mode 100644 helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
 create mode 100644 helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json
 create mode 100644 helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
 create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
 create mode 100644 helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
 copy website/{0.6.5 => 1.0.2}/pom.xml (92%)
 copy website/{ => 1.0.2}/src/site/apt/privacy-policy.apt (100%)
 create mode 100644 website/1.0.2/src/site/apt/releasenotes/release-1.0.2.apt
 copy website/{0.9.0 => 1.0.2}/src/site/markdown/Building.md (95%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/Features.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/Metrics.md (100%)
 copy website/{0.6.6 => 1.0.2}/src/site/markdown/Quickstart.md (99%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/Tutorial.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/auto_exit_maintenance.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/design_crushed.md (100%)
 copy website/{0.8.1 => 1.0.2}/src/site/markdown/index.md (89%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/quota_scheduling.md (100%)
 copy website/{0.6.3 => 1.0.2}/src/site/markdown/recipes/lock_manager.md (99%)
 copy website/{0.6.5 => 1.0.2}/src/site/markdown/recipes/rabbitmq_consumer_group.md (99%)
 copy website/{0.6.5 => 1.0.2}/src/site/markdown/recipes/rsync_replicated_file_store.md (99%)
 copy website/{0.6.4 => 1.0.2}/src/site/markdown/recipes/service_discovery.md (99%)
 copy website/{0.6.4 => 1.0.2}/src/site/markdown/recipes/task_dag_execution.md (99%)
 copy website/{0.6.3 => 1.0.2}/src/site/markdown/tutorial_admin.md (99%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_agent.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_cloud_support.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_controller.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_customized_view.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_distributed_lock.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_health.md (100%)
 copy website/{0.6.6 => 1.0.2}/src/site/markdown/tutorial_messaging.md (98%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_participant.md (100%)
 copy website/{0.6.3 => 1.0.2}/src/site/markdown/tutorial_propstore.md (95%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_rebalance.md (100%)
 copy website/{0.8.4 => 1.0.2}/src/site/markdown/tutorial_rest_service.md (99%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_spectator.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_state.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_task_framework.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_task_throttling.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_throttling.md (100%)
 copy website/{0.8.3 => 1.0.2}/src/site/markdown/tutorial_ui.md (99%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_user_content_store.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_user_def_rebalancer.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/markdown/tutorial_yaml.md (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/.htaccess (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/JobExample.json (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/WorkflowExample.json (100%)
 copy website/{ => 1.0.2}/src/site/resources/css/bootstrap-responsive.min.css (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/css/bootstrap.min.css (100%)
 copy website/{ => 1.0.2}/src/site/resources/download.cgi (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/CustomizedViewSystemArchitecture.jpeg (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/HelixPriorityLockWorkflow.jpeg (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/PFS-Generic.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/ParticipantAutoRegistrationLogic.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/RSYNC_BASED_PFS.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/TaskFrameworkLayers.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/UIScreenshot.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/UIScreenshot2.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/auto-exit-maintenance.jpg (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/after-using-crushed.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/before-using-crush.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/classes.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/crushed-master-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/crushed-partition-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/cursh-master-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/cursh-partition-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/example-cluster-master-dist-after.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/example-cluster-master-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/example-cluster-partition-dist.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/example-movement-on-expansion.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/node-down-master-move.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/node-down-partition-move.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/design/crushed/performance.png (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/quota_InstanceCapacityManager.jpeg (100%)
 copy website/{1.0.1 => 1.0.2}/src/site/resources/images/quota_intro.png (100%)
 copy website/{ => 1.0.2}/src/site/resources/js/bootstrap.min.js (100%)
 copy website/{ => 1.0.2}/src/site/resources/js/jquery.min.js (100%)
 copy website/{1.0.0 => 1.0.2}/src/site/site.xml (96%)
 copy website/{0.9.0 => 1.0.2}/src/site/xdoc/download.xml.vm (99%)
 copy website/{1.0.1 => 1.0.2}/src/test/conf/testng.xml (100%)

[helix] 06/06: Check cluster management mode status (#1798)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b8f2331e91a984c372e747b44aecd1cc2d23347c
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Tue Jun 22 11:22:49 2021 -0700

    Check cluster management mode status (#1798)
    
    Controller needs to know the participant freeze status so it can send freeze/unfreeze messages for entering/exiting freeze mode. The status check is done in management mode stage.
    
    This commit adds methods to check cluster management mode status, and update the status and history accordingly.
---
 .../main/java/org/apache/helix/PropertyKey.java    |   2 +-
 .../java/org/apache/helix/PropertyPathBuilder.java |   1 +
 .../pipeline/PipelineSwitchException.java          |  29 +++++
 .../controller/stages/ManagementModeStage.java     | 134 +++++++++++++++++++
 .../controller/stages/ResourceValidationStage.java |   6 +-
 .../java/org/apache/helix/model/ClusterStatus.java |   5 +
 .../org/apache/helix/model/ControllerHistory.java  |   8 +-
 .../controller/stages/TestManagementModeStage.java | 142 +++++++++++++++++++++
 8 files changed, 322 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 254cb95..eb5d5c1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -247,7 +247,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey clusterStatus() {
-      return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName);
+      return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName, _clusterName);
     }
 
     /**
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 34efd29..5c63304 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -89,6 +89,7 @@ public class PropertyPathBuilder {
     addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
     addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
     addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS");
+    addEntry(PropertyType.STATUS, 2, "/{clusterName}/STATUS/{clusterName}");
 
     addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
     addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
new file mode 100644
index 0000000..1584708
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java
@@ -0,0 +1,29 @@
+package org.apache.helix.controller.pipeline;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Used to exit the current controller pipeline and switch to another pipeline.
+ */
+public class PipelineSwitchException extends StageException {
+  public PipelineSwitchException(String message) {
+    super(message);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 042aa14..94ff1d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,11 +19,31 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ControllerHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,9 +58,23 @@ public class ManagementModeStage extends AbstractBaseStage {
     // TODO: implement the stage
     _eventId = event.getEventId();
     String clusterName = event.getClusterName();
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    if (manager == null) {
+      throw new StageException("HelixManager attribute value is null");
+    }
+
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
 
+    ClusterManagementMode managementMode =
+        checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(),
+            cache.getAllInstancesMessages(), cache.getPauseSignal());
+
+    recordClusterStatus(managementMode, accessor);
+    recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(),
+        accessor);
+
     // TODO: move to the last stage of management pipeline
     checkInManagementMode(clusterName, cache);
   }
@@ -53,4 +87,104 @@ public class ManagementModeStage extends AbstractBaseStage {
       RebalanceUtil.enableManagementMode(clusterName, false);
     }
   }
+
+  // Checks cluster freeze, controller pause mode and status.
+  private ClusterManagementMode checkClusterFreezeStatus(
+      Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages,
+      PauseSignal pauseSignal) {
+    ClusterManagementMode.Type type;
+    ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
+    if (pauseSignal == null) {
+      // TODO: Should check maintenance mode after it's moved to management pipeline.
+      type = ClusterManagementMode.Type.NORMAL;
+      if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
+          allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else if (pauseSignal.isClusterPause()) {
+      type = ClusterManagementMode.Type.CLUSTER_PAUSE;
+      if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
+        status = ClusterManagementMode.Status.IN_PROGRESS;
+      }
+    } else {
+      type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
+    }
+
+    return new ClusterManagementMode(type, status);
+  }
+
+  private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages) {
+    // 1. All live instances are frozen
+    // 2. No pending participant status change message.
+    return enabledLiveInstances.stream().noneMatch(
+        instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+            || hasPendingMessage(allInstanceMessages.get(instance),
+            MessageType.PARTICIPANT_STATUS_CHANGE));
+  }
+
+  private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
+    return messages != null && messages.stream()
+        .anyMatch(message -> type.name().equals(message.getMsgType()));
+  }
+
+  private void recordClusterStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
+    // update cluster status
+    PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
+    ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
+    if (clusterStatus == null) {
+      clusterStatus = new ClusterStatus();
+    }
+
+    ClusterManagementMode.Type recordedType = clusterStatus.getManagementMode();
+    ClusterManagementMode.Status recordedStatus = clusterStatus.getManagementModeStatus();
+
+    // If there is any pending message sent by users, status could be computed as in progress.
+    // Skip recording status change to avoid confusion after cluster is already fully frozen.
+    if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(recordedType)
+        && ClusterManagementMode.Status.COMPLETED.equals(recordedStatus)
+        && ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())
+        && ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())) {
+      LOG.info("Skip recording status mode={}, status={}, because cluster is fully frozen",
+          mode.getMode(), mode.getStatus());
+      return;
+    }
+
+    if (!mode.getMode().equals(recordedType) || !mode.getStatus().equals(recordedStatus)) {
+      // Only update status when it's different with metadata store
+      clusterStatus.setManagementMode(mode.getMode());
+      clusterStatus.setManagementModeStatus(mode.getStatus());
+      if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
+        LOG.error("Failed to update cluster status {}", clusterStatus);
+      }
+    }
+  }
+
+  private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
+      String controllerName, HelixDataAccessor accessor) {
+    // Only record completed status
+    if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
+      return;
+    }
+
+    // Record a management mode history in controller history
+    String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
+    long timestamp = Instant.now().toEpochMilli();
+    String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost());
+    String reason = (pauseSignal == null ? null : pauseSignal.getReason());
+
+    // Need the updater to avoid race condition with controller/maintenance history updates.
+    if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {
+      if (oldRecord == null) {
+        oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
+      }
+      return new ControllerHistory(oldRecord)
+          .updateManagementModeHistory(controllerName, mode, fromHost, timestamp, reason);
+    }, AccessOption.PERSISTENT)) {
+      LOG.error("Failed to write management mode history to ZK!");
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 613ce2e..696506a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.PipelineSwitchException;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
@@ -102,9 +103,10 @@ public class ResourceValidationStage extends AbstractBaseStage {
       LogUtil.logInfo(LOG, _eventId,
           "Enabling management mode pipeline for cluster " + event.getClusterName());
       RebalanceUtil.enableManagementMode(event.getClusterName(), true);
-      throw new StageException(
+      // TODO: redesign to terminate and switch pipeline more peacefully
+      throw new PipelineSwitchException(
           "Pipeline should not be run because cluster " + event.getClusterName()
-              + "is in management mode");
+              + " is in management mode");
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
index 6ed354c..a405fe3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Represents the cluster status. It can have fields for
@@ -32,6 +33,10 @@ public class ClusterStatus extends HelixProperty {
     super(PropertyType.STATUS.name());
   }
 
+  public ClusterStatus(ZNRecord record) {
+    super(record);
+  }
+
   public enum ClusterStatusProperty {
     MANAGEMENT_MODE,
     MANAGEMENT_MODE_STATUS
diff --git a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
index 4e418c3..47b0958 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
@@ -162,8 +162,12 @@ public class ControllerHistory extends HelixProperty {
     historyEntry.put(ConfigProperty.TIME.name(), Instant.ofEpochMilli(time).toString());
     historyEntry.put(ManagementModeConfigKey.MODE.name(), mode.getMode().name());
     historyEntry.put(ManagementModeConfigKey.STATUS.name(), mode.getStatus().name());
-    historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
-    historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+    if (fromHost != null) {
+      historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
+    }
+    if (reason != null) {
+      historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+    }
 
     return populateHistoryEntries(HistoryType.MANAGEMENT_MODE, historyEntry.toString());
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
new file mode 100644
index 0000000..28ca524
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -0,0 +1,142 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestManagementModeStage extends ZkTestBase {
+  HelixManager _manager;
+  HelixDataAccessor _accessor;
+  String _clusterName;
+
+  @BeforeClass
+  public void beforeClass() {
+    _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+    _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    _manager = new DummyClusterManager(_clusterName, _accessor);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    deleteLiveInstances(_clusterName);
+    deleteCluster(_clusterName);
+  }
+
+  @Test
+  public void testClusterFreezeStatus() throws Exception {
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1});
+    setupStateModel(_clusterName);
+
+    ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Unknown);
+    ManagementControllerDataProvider cache = new ManagementControllerDataProvider(_clusterName,
+        Pipeline.Type.MANAGEMENT_MODE.name());
+    event.addAttribute(AttributeName.helixmanager.name(), _manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Freeze cluster
+    ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+        .withReason("test")
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh, false);
+    ManagementModeStage managementModeStage = new ManagementModeStage();
+    managementModeStage.process(event);
+
+    // In frozen mode
+    ClusterStatus clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.CLUSTER_PAUSE);
+
+
+    // Mark a live instance to be pause state
+    LiveInstance liveInstance = liveInstances.get(0);
+    liveInstance.setStatus(LiveInstance.LiveInstanceStatus.PAUSED);
+    PropertyKey liveInstanceKey =
+        _accessor.keyBuilder().liveInstance(liveInstance.getInstanceName());
+    _accessor.updateProperty(liveInstanceKey, liveInstance);
+    // Require cache refresh
+    cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+
+    // Unfreeze cluster
+    request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.NORMAL)
+        .withReason("test")
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+    runPipeline(event, dataRefresh, false);
+    managementModeStage.process(event);
+    clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+    // In progress because a live instance is still frozen
+    Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+        ClusterManagementMode.Status.IN_PROGRESS);
+
+    // remove froze status to mark the live instance to be normal status
+    liveInstance = _accessor.getProperty(liveInstanceKey);
+    liveInstance.getRecord().getSimpleFields()
+        .remove(LiveInstance.LiveInstanceProperty.STATUS.name());
+    _accessor.setProperty(liveInstanceKey, liveInstance);
+    // Require cache refresh
+    cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
+    runPipeline(event, dataRefresh, false);
+    try {
+      managementModeStage.process(event);
+    } catch (HelixException expected) {
+      // It's expected because controller does not set for cluster.
+      Assert.assertTrue(expected.getMessage()
+          .startsWith("Failed to switch management mode pipeline, enabled=false"));
+    }
+    clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus());
+
+    // Fully existed frozen mode
+    Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL);
+    Assert.assertEquals(clusterStatus.getManagementModeStatus(),
+        ClusterManagementMode.Status.COMPLETED);
+  }
+}

[helix] 03/06: Add management mode pipeline registry and switch logic (#1769)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 82513fa90ccc4e0612726a8853df80988d3cc9dc
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Wed Jun 9 21:01:30 2021 -0700

    Add management mode pipeline registry and switch logic (#1769)
    
    Management Mode Pipeline will help check the cluster status and determine whether the default pipelines can be run.
    One use case is, it will help controller to decide when it can exit the cluster freeze mode.
    
    This commit adds management mode pipeline and logic to switch from/to the default resource/task pipelines.
---
 .../helix/controller/GenericHelixController.java   | 151 ++++++++++++++++-----
 .../ManagementControllerDataProvider.java}         |  28 +---
 .../apache/helix/controller/pipeline/Pipeline.java |   9 +-
 .../helix/controller/stages/ClusterEventType.java  |   1 +
 .../controller/stages/ManagementModeStage.java     |  48 +++++++
 .../controller/stages/ResourceValidationStage.java |  10 ++
 .../main/java/org/apache/helix/util/HelixUtil.java |  12 ++
 .../java/org/apache/helix/util/RebalanceUtil.java  |  22 +++
 8 files changed, 223 insertions(+), 58 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 11cd9ee..7da6ef0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -63,6 +63,7 @@ import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -81,7 +82,7 @@ import org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.ManagementModeStage;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -141,6 +142,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   private static final int ASYNC_TASKS_THREADPOOL_SIZE = 10;
   private final PipelineRegistry _registry;
   private final PipelineRegistry _taskRegistry;
+  private final PipelineRegistry _managementModeRegistry;
 
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
@@ -163,6 +165,11 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   private final ClusterEventBlockingQueue _taskEventQueue;
   private final ClusterEventProcessor _taskEventThread;
 
+  // Controller will switch to run management mode pipeline when set to true.
+  private boolean _inManagementMode;
+  private final ClusterEventBlockingQueue _managementModeEventQueue;
+  private final ClusterEventProcessor _managementModeEventThread;
+
   private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
 
   private long _continuousRebalanceFailureCount = 0;
@@ -197,6 +204,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
    */
   private final ResourceControllerDataProvider _resourceControlDataProvider;
   private final WorkflowControllerDataProvider _workflowControlDataProvider;
+  private final ManagementControllerDataProvider _managementControllerDataProvider;
   private final ScheduledExecutorService _asyncTasksThreadPool;
 
   /**
@@ -285,13 +293,21 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName,
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), clusterName,
         Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   public GenericHelixController(String clusterName, Set<Pipeline.Type> enabledPipelins) {
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, enabledPipelins);
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        clusterName,
+        enabledPipelins);
+  }
+
+  public void setInManagementMode(boolean enabled) {
+    _inManagementMode = enabled;
   }
 
   class RebalanceTask extends TimerTask {
@@ -566,6 +582,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
       registry
           .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline,
               dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, autoExitMaintenancePipeline,
+          dataPreprocess, externalViewPipeline, rebalancePipeline);
       // TODO: We now include rebalance pipeline in customized state change for correctness.
       // However, it is not efficient, and we should improve this by splitting the pipeline or
       // controller roles to multiple hosts.
@@ -625,22 +643,49 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           rebalancePipeline);
       registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, dataPreprocess,
           rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
+  private static PipelineRegistry createManagementModeRegistry(String pipelineName) {
+    logger.info("Creating management mode registry");
+    synchronized (GenericHelixController.class) {
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // cluster management mode process
+      Pipeline managementMode = new Pipeline(pipelineName);
+      managementMode.addStage(new ManagementModeStage());
+
+      PipelineRegistry registry = new PipelineRegistry();
+      Arrays.asList(
+          ClusterEventType.ControllerChange,
+          ClusterEventType.LiveInstanceChange,
+          ClusterEventType.MessageChange,
+          ClusterEventType.OnDemandRebalance,
+          ClusterEventType.PeriodicalRebalance
+      ).forEach(type -> registry.register(type, dataRefresh, managementMode));
+
       return registry;
     }
   }
 
   // TODO: refactor the constructor as providing both registry but only enabling one looks confusing
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry) {
-    this(registry, taskRegistry, null, Sets.newHashSet(
-        Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
+    this(registry, taskRegistry, createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        null, Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry,
-      final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) {
-    _paused = false;
+      PipelineRegistry managementModeRegistry, final String clusterName,
+      Set<Pipeline.Type> enabledPipelineTypes) {
     _enabledPipelineTypes = enabledPipelineTypes;
     _registry = registry;
     _taskRegistry = taskRegistry;
+    _managementModeRegistry = managementModeRegistry;
     _lastSeenInstances = new AtomicReference<>();
     _lastSeenSessions = new AtomicReference<>();
     _lastSeenCustomizedStateTypesMapRef = new AtomicReference<>();
@@ -660,6 +705,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     _onDemandRebalanceTimer =
         new Timer("GenericHelixController_" + _clusterName + "_onDemand_Timer", true);
 
+    // TODO: refactor to simplify below similar code of the 3 pipelines
     // initialize pipelines at the end so we have everything else prepared
     if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
       logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
@@ -689,6 +735,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
       _taskEventThread = null;
     }
 
+    logger.info("Initializing {} pipeline", Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementControllerDataProvider =
+        new ManagementControllerDataProvider(clusterName, Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementModeEventQueue = new ClusterEventBlockingQueue();
+    _managementModeEventThread =
+        new ClusterEventProcessor(_managementControllerDataProvider, _managementModeEventQueue,
+            Pipeline.Type.MANAGEMENT_MODE.name() + "-" + clusterName);
+    initPipeline(_managementModeEventThread, _managementControllerDataProvider);
+    logger.info("Initialized {} pipeline", Pipeline.Type.MANAGEMENT_MODE.name());
+
     addController(this);
   }
 
@@ -776,12 +832,36 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
 
     _helixManager = manager;
 
-    // TODO If init controller with paused = true, it may not take effect immediately
-    // _paused is default false. If any events come before controllerChangeEvent, the controller
-    // will be excuting in un-paused mode. Which might not be the config in ZK.
-    if (_paused) {
-      logger.info("Cluster " + manager.getClusterName() + " is paused. Ignoring the event:" + event
-          .getEventType());
+    // Prepare ClusterEvent
+    // TODO (harry): this is a temporal workaround - after controller is separated we should not
+    // have this instanceof clauses
+    List<Pipeline> pipelines;
+    boolean isTaskFrameworkPipeline = false;
+    Pipeline.Type pipelineType;
+
+    if (dataProvider instanceof ResourceControllerDataProvider) {
+      pipelines = _registry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.DEFAULT;
+    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
+      pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
+      isTaskFrameworkPipeline = true;
+      pipelineType = Pipeline.Type.TASK;
+    } else if (dataProvider instanceof ManagementControllerDataProvider) {
+      pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+    } else {
+      logger.warn(String
+          .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
+              event.getEventType(), event.getEventId()));
+      return;
+    }
+
+    // Should not run management mode and default/task pipelines at the same time.
+    if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
+        || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+      logger.info("Should not run management mode and default/task pipelines at the same time. "
+              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
+          manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
       return;
     }
 
@@ -808,26 +888,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
 
     dataProvider.setClusterEventId(event.getEventId());
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
-
-    // Prepare ClusterEvent
-    // TODO (harry): this is a temporal workaround - after controller is separated we should not
-    // have this instanceof clauses
-    List<Pipeline> pipelines;
-    boolean isTaskFrameworkPipeline = false;
-
-    if (dataProvider instanceof ResourceControllerDataProvider) {
-      pipelines = _registry
-          .getPipelinesForEvent(event.getEventType());
-    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
-      pipelines = _taskRegistry
-          .getPipelinesForEvent(event.getEventType());
-      isTaskFrameworkPipeline = true;
-    } else {
-      logger.warn(String
-          .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
-              event.getEventType(), event.getEventId()));
-      return;
-    }
     event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
 
     logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. "
@@ -1202,6 +1262,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.notifyDataChange(type, path);
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.notifyDataChange(type, path);
+    }
   }
 
   private void requestDataProvidersFullRefresh() {
@@ -1212,6 +1276,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.requireFullRefresh();
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.requireFullRefresh();
+    }
   }
 
   private void pushToEventQueues(ClusterEventType eventType, NotificationContext changeContext,
@@ -1228,6 +1296,14 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) {
       event.addAttribute(attr.getKey(), attr.getValue());
     }
+
+    // Management mode event will force management mode pipeline.
+    if (_inManagementMode) {
+      event.setEventId(uid + "_" + Pipeline.Type.MANAGEMENT_MODE.name());
+      enqueueEvent(_managementModeEventQueue, event);
+      return;
+    }
+
     enqueueEvent(_eventQueue, event);
     enqueueEvent(_taskEventQueue,
         event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
@@ -1269,7 +1345,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
       boolean prevInMaintenanceMode = _inMaintenanceMode;
       _paused = updateControllerState(pauseSignal, _paused);
       _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
-      triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode);
+      // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
+      if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
+        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
+      }
 
       enableClusterStatusMonitor(true);
       _clusterStatusMonitor.setEnabled(!_paused);
@@ -1484,7 +1563,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
    * @param prevPaused the previous paused status.
    * @param prevInMaintenanceMode the previous in maintenance mode status.
    */
-  private void triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
+  private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
       boolean prevInMaintenanceMode) {
     /**
      * WARNING: the logic here is tricky.
@@ -1496,7 +1575,9 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
       pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
       logger.info("controller is now resumed from paused/maintenance state");
+      return true;
     }
+    return false;
   }
 
   // TODO: refactor this to use common/ClusterEventProcessor.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
similarity index 62%
copy from helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
copy to helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index cd0ce60..d178ca5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -1,4 +1,4 @@
-package org.apache.helix.controller.stages;
+package org.apache.helix.controller.dataproviders;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,25 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-public enum ClusterEventType {
-  IdealStateChange,
-  CurrentStateChange,
-  TaskCurrentStateChange,
-  CustomizedStateChange,
-  ConfigChange,
-  ClusterConfigChange,
-  ResourceConfigChange,
-  InstanceConfigChange,
-  CustomizeStateConfigChange,
-  LiveInstanceChange,
-  MessageChange,
-  ExternalViewChange,
-  CustomizedViewChange,
-  TargetExternalViewChange,
-  Resume,
-  PeriodicalRebalance,
-  OnDemandRebalance,
-  RetryRebalance,
-  StateVerifier,
-  Unknown
+public class ManagementControllerDataProvider extends BaseControllerDataProvider {
+  // TODO: implement this class to only refresh required event types
+  public ManagementControllerDataProvider(String clusterName, String name) {
+    super(clusterName, name);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index ecf42da..4196e23 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -35,7 +35,14 @@ public class Pipeline {
 
   public enum Type {
     DEFAULT,
-    TASK
+    TASK,
+
+    /**
+     * A pipeline used to manage the cluster when it is in admin management mode:
+     * cluster freeze mode, controller pause mode, etc. Used by Helix internally,
+     * not meant to be used for Helix external users.
+     */
+    MANAGEMENT_MODE
   }
 
   public Pipeline() {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index cd0ce60..65f6bb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -37,6 +37,7 @@ public enum ClusterEventType {
   Resume,
   PeriodicalRebalance,
   OnDemandRebalance,
+  ControllerChange,
   RetryRebalance,
   StateVerifier,
   Unknown
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
new file mode 100644
index 0000000..512224d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks the cluster status whether the cluster is in management mode.
+ */
+public class ManagementModeStage extends AbstractBaseStage {
+  private static final Logger LOG = LoggerFactory.getLogger(ManagementModeStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    // TODO: implement the stage
+    String clusterName = event.getClusterName();
+    ManagementControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    if (!HelixUtil.inManagementMode(cache)) {
+      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+      RebalanceUtil.enableManagementMode(clusterName, false);
+      throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 075c40f..a4d4783 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,14 @@ public class ResourceValidationStage extends AbstractBaseStage {
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
+
+    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+    if (HelixUtil.inManagementMode(cache)) {
+      // Trigger an immediate management mode pipeline.
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException("Pipeline should not be run because cluster is in management mode");
+    }
+
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
       throw new StageException("Resources must be computed prior to validation!");
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index ce22c5f..3151716 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -547,4 +548,15 @@ public final class HelixUtil {
     instanceConfig.setInstanceEnabled(true);
     return instanceConfig;
   }
+
+  /**
+   * Checks whether or not the cluster is in management mode.
+   *
+   * @param cache
+   * @return
+   */
+  public static boolean inManagementMode(BaseControllerDataProvider cache) {
+    // TODO: implement the logic. Parameters can also change
+    return true;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 91ec406..db2b76f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -145,6 +145,28 @@ public class RebalanceUtil {
     return result;
   }
 
+  /**
+   * Enables/disables controller to run management mode pipeline.
+   *
+   * @param clusterName target cluster name
+   * @param enabled enable/disable controller to management mode pipeline
+   */
+  public static void enableManagementMode(String clusterName, boolean enabled) {
+    GenericHelixController leaderController =
+        GenericHelixController.getLeaderController(clusterName);
+    if (leaderController != null) {
+      LOG.info("Switching management mode pipeline for cluster={}, enabled={}", clusterName,
+          enabled);
+      leaderController.setInManagementMode(enabled);
+    } else {
+      LOG.error("Failed to switch management mode pipeline, enabled={}. "
+          + "Controller for cluster {} does not exist", clusterName, enabled);
+    }
+
+    // Triggers an event to immediately run the pipeline
+    scheduleOnDemandPipeline(clusterName, 0L);
+  }
+
   public static void scheduleOnDemandPipeline(String clusterName, long delay) {
     scheduleOnDemandPipeline(clusterName, delay, true);
   }

[helix] 01/06: Add java api for enable/disable cluster pause mode (#1740)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5eec5a6a376ba7abdb5850b9249dd3bf76110186
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Wed May 19 15:45:11 2021 -0700

    Add java api for enable/disable cluster pause mode (#1740)
    
    Cluster pause mode feature is going to be added. This commit adds java api to set cluster pause mode: void setClusterManagementMode(ClusterManagementModeRequest request);
---
 .../src/main/java/org/apache/helix/HelixAdmin.java |  14 +++
 .../api/exceptions/HelixConflictException.java     |  33 +++++++
 .../helix/api/status/ClusterManagementMode.java    |  70 ++++++++++++++
 .../api/status/ClusterManagementModeRequest.java   | 102 +++++++++++++++++++++
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  80 +++++++++++++++-
 .../java/org/apache/helix/model/LiveInstance.java  |  28 +++++-
 .../java/org/apache/helix/model/PauseSignal.java   |  42 ++++++++-
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |  70 ++++++++++++++
 .../java/org/apache/helix/mock/MockHelixAdmin.java |   6 ++
 .../org/apache/helix/model/TestLiveInstance.java   |   8 ++
 10 files changed, 449 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 3c456bb..d403571 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -37,6 +37,8 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 
 /*
  * Helix cluster management
@@ -368,6 +370,18 @@ public interface HelixAdmin {
   boolean isInMaintenanceMode(String clusterName);
 
   /**
+   * Requests to put a cluster into a management mode
+   * {@link ClusterManagementMode.Type}. When this method returns,
+   * it means the signal has been successfully sent, but it does not mean the cluster has
+   * fully entered the mode. Because the cluster can take some time to complete the request.
+   * <p>
+   * To check the cluster management mode status, call {@link #getClusterManagementMode(String)}.
+   *
+   * @param request request to set the cluster management mode. {@link ClusterManagementModeRequest}
+   */
+  void setClusterManagementMode(ClusterManagementModeRequest request);
+
+  /**
    * Reset a list of partitions in error state for an instance
    * The partitions are assume to be in error state and reset will bring them from error
    * to initial state. An error to initial state transition is required for reset.
diff --git a/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixConflictException.java b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixConflictException.java
new file mode 100644
index 0000000..c626155
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixConflictException.java
@@ -0,0 +1,33 @@
+package org.apache.helix.api.exceptions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+
+/**
+ * Occurs when a conflict with a previous successful write is detected. This generally occurs when
+ * a write request is conflicted with the existing data. Eg. if a cluster is already in cluster
+ * pause mode, a request of enabling maintenance mode is a conflict.
+ */
+public class HelixConflictException extends HelixException {
+  public HelixConflictException(String message) {
+    super(message);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
new file mode 100644
index 0000000..d7a1637
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
@@ -0,0 +1,70 @@
+package org.apache.helix.api.status;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Represents the management mode of the cluster:
+ * 1. what type of mode it targets to be;
+ * 2. what progress status it is now.
+ */
+public class ClusterManagementMode {
+    /** Represents  */
+    public enum Type {
+        /** Cluster is not in any pause or maintenance mode */
+        NORMAL,
+
+        /**
+         * Puts a cluster into pause mode, which will pause controller and participants.
+         * This can be used to retain the cluster state.
+         */
+        CLUSTER_PAUSE,
+
+        /** Pause controller only, but not participants. */
+        CONTROLLER_PAUSE,
+
+        /** Put cluster into maintenance mode. */
+        MAINTENANCE
+    }
+
+    /** Current status of the cluster mode */
+    public enum Status {
+        /** Cluster is in progress to the target {@link Type} of mode */
+        IN_PROGRESS,
+
+        /** Cluster is fully stable in the target {@link Type} of mode */
+        COMPLETED
+    }
+
+    private final Type mode;
+    private final Status status;
+
+    public ClusterManagementMode(Type mode, Status status) {
+        this.mode = mode;
+        this.status = status;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public Type getMode() {
+        return mode;
+    }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
new file mode 100644
index 0000000..dd2fe58
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
@@ -0,0 +1,102 @@
+package org.apache.helix.api.status;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a request to set the cluster management mode {@link ClusterManagementMode}
+ */
+public class ClusterManagementModeRequest {
+  private final ClusterManagementMode.Type _mode;
+  private final String _clusterName;
+  private final String _reason;
+  private final boolean _cancelPendingST;
+
+  public ClusterManagementMode.Type getMode() {
+    return _mode;
+  }
+
+  public String getClusterName() {
+    return _clusterName;
+  }
+
+  public String getReason() {
+    return _reason;
+  }
+
+  public boolean isCancelPendingST() {
+    return _cancelPendingST;
+  }
+
+  public ClusterManagementModeRequest(Builder builder) {
+    _mode = builder.mode;
+    _clusterName = builder.clusterName;
+    _reason = builder.reason;
+    _cancelPendingST = builder.cancelPendingST;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+    private ClusterManagementMode.Type mode;
+    private String clusterName;
+    private String reason = "";
+    private boolean cancelPendingST;
+
+    public Builder withMode(ClusterManagementMode.Type mode) {
+      this.mode = mode;
+      return this;
+    }
+
+    public Builder withClusterName(String clusterName) {
+      this.clusterName = clusterName;
+      return this;
+    }
+
+    public Builder withReason(String reason) {
+      this.reason = reason;
+      return this;
+    }
+
+    /**
+     * If mode is not CLUSTER_PAUSE, this should not be set to true.
+     *
+     * @param cancelPendingST whether or not cancel pending ST for CLUSTER_PAUSE mode.
+     * @return {@link Builder}
+     */
+    public Builder withCancelPendingST(boolean cancelPendingST) {
+      this.cancelPendingST = cancelPendingST;
+      return this;
+    }
+
+    public ClusterManagementModeRequest build() {
+      validate();
+      return new ClusterManagementModeRequest(this);
+    }
+
+    private void validate() {
+      Preconditions.checkNotNull(mode, "Mode not set");
+      Preconditions.checkNotNull(clusterName, "Cluster name not set");
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 62840b6..2545139 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,9 +52,9 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.api.exceptions.HelixConflictException;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.topology.ClusterTopology;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -80,6 +81,7 @@ import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
@@ -93,6 +95,7 @@ import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.apache.helix.zookeeper.zkclient.NetworkUtil;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.zookeeper.KeeperException;
@@ -498,6 +501,79 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void setClusterManagementMode(ClusterManagementModeRequest request) {
+    ClusterManagementMode.Type mode = request.getMode();
+    String clusterName = request.getClusterName();
+    String reason = request.getReason();
+
+    // TODO: support other modes
+    switch (mode) {
+      case CLUSTER_PAUSE:
+        enableClusterPauseMode(clusterName, request.isCancelPendingST(), reason);
+        break;
+      case NORMAL:
+        // If from other modes, should check what mode it is in and call the api accordingly.
+        // If we put all mode config in one znode, one generic method is good enough.
+        disableClusterPauseMode(clusterName);
+        break;
+      default:
+        throw new IllegalArgumentException("ClusterManagementMode " + mode + " is not supported");
+    }
+  }
+
+  private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) {
+    String hostname = NetworkUtil.getLocalhostName();
+    logger.info(
+        "Enable cluster pause mode for cluster: {}. CancelPendingST: {}. Reason: {}. From Host: {}",
+        clusterName, cancelPendingST, reason, hostname);
+
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+
+    if (baseDataAccessor.exists(accessor.keyBuilder().pause().getPath(), AccessOption.PERSISTENT)) {
+      throw new HelixConflictException(clusterName + " pause signal already exists");
+    }
+    if (baseDataAccessor.exists(accessor.keyBuilder().maintenance().getPath(), AccessOption.PERSISTENT)) {
+      throw new HelixConflictException(clusterName + " maintenance signal already exists");
+    }
+
+    // check whether cancellation is enabled
+    ClusterConfig config = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+    if (cancelPendingST && !config.isStateTransitionCancelEnabled()) {
+      throw new HelixConflictException(
+          "State transition cancellation not enabled in " + clusterName);
+    }
+
+    PauseSignal pauseSignal = new PauseSignal();
+    pauseSignal.setClusterPause(true);
+    pauseSignal.setCancelPendingST(cancelPendingST);
+    pauseSignal.setFromHost(hostname);
+    pauseSignal.setTriggerTime(Instant.now().toEpochMilli());
+    if (reason != null && !reason.isEmpty()) {
+      pauseSignal.setReason(reason);
+    }
+    // TODO: merge management status signal into one znode to avoid race condition
+    if (!accessor.createPause(pauseSignal)) {
+      throw new HelixException("Failed to create pause signal");
+    }
+  }
+
+  private void disableClusterPauseMode(String clusterName) {
+    logger.info("Disable cluster pause mode for cluster: {}", clusterName);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+    PropertyKey pausePropertyKey = accessor.keyBuilder().pause();
+    PauseSignal pauseSignal = accessor.getProperty(pausePropertyKey);
+    if (pauseSignal == null || !pauseSignal.isClusterPause()) {
+      throw new HelixException("Cluster pause mode is not enabled for cluster " + clusterName);
+    }
+
+    if (!accessor.removeProperty(pausePropertyKey)) {
+      throw new HelixException("Failed to disable cluster pause mode for cluster: " + clusterName);
+    }
+  }
+
+  @Override
   @Deprecated
   public void enableMaintenanceMode(String clusterName, boolean enabled, String reason) {
     manuallyEnableMaintenanceMode(clusterName, enabled, reason, null);
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 0adccdd..7a9671d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -40,7 +40,17 @@ public class LiveInstance extends HelixProperty {
     LIVE_INSTANCE,
     ZKPROPERTYTRANSFERURL,
     RESOURCE_CAPACITY,
-    CURRENT_TASK_THREAD_POOL_SIZE
+    CURRENT_TASK_THREAD_POOL_SIZE,
+
+    /** Represents the status of live instance, eg. PAUSED */
+    STATUS
+  }
+
+  /**
+   * Saved values for the {@link LiveInstanceProperty#STATUS} field
+   */
+  public enum LiveInstanceStatus {
+    PAUSED
   }
 
   /**
@@ -131,6 +141,22 @@ public class LiveInstance extends HelixProperty {
   }
 
   /**
+   * Gets the live instance's status. Returns null if the status field is not set.
+   */
+  public LiveInstanceStatus getStatus() {
+    return _record.getEnumField(LiveInstanceProperty.STATUS.name(), LiveInstanceStatus.class, null);
+  }
+
+  /**
+   * Sets the status in simple field.
+   *
+   * @param status status value
+   */
+  public void setStatus(LiveInstanceStatus status) {
+    _record.setEnumField(LiveInstanceProperty.STATUS.name(), status);
+  }
+
+  /**
    * Get an identifier that represents the instance and where it is located
    * @return identifier, e.g. process_id@host
    */
diff --git a/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java b/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
index e68345d..bcfd17d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
+++ b/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
@@ -19,6 +19,8 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.time.Instant;
+
 import org.apache.helix.HelixProperty;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
@@ -26,9 +28,18 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
  * Represent a pause in the cluster
  */
 public class PauseSignal extends HelixProperty {
+  private static final String DEFAULT_PAUSE_ID = "pause";
 
   public enum PauseSignalProperty {
-    REASON
+    REASON,
+    CLUSTER_PAUSE,
+    FROM_HOST,
+    CANCEL_PENDING_ST,
+    TRIGGER_TIME
+  }
+
+  public PauseSignal() {
+    this(DEFAULT_PAUSE_ID);
   }
 
   /**
@@ -63,4 +74,33 @@ public class PauseSignal extends HelixProperty {
   public boolean isValid() {
     return true;
   }
+
+  public void setClusterPause(boolean pause) {
+    _record.setBooleanField(PauseSignalProperty.CLUSTER_PAUSE.name(), pause);
+  }
+
+  public boolean isClusterPause() {
+    return _record.getBooleanField(PauseSignalProperty.CLUSTER_PAUSE.name(), false);
+  }
+
+  public void setFromHost(String host) {
+    _record.setSimpleField(PauseSignalProperty.FROM_HOST.name(), host);
+  }
+
+  public String getFromHost() {
+    return _record.getSimpleField(PauseSignalProperty.FROM_HOST.name());
+  }
+
+  public void setCancelPendingST(boolean cancel) {
+    _record.setBooleanField(PauseSignalProperty.CANCEL_PENDING_ST.name(), cancel);
+  }
+
+  public boolean getCancelPendingST() {
+    return _record.getBooleanField(PauseSignalProperty.CANCEL_PENDING_ST.name(), false);
+  }
+
+  public void setTriggerTime(long time) {
+    _record.setSimpleField(PauseSignalProperty.TRIGGER_TIME.name(),
+        Instant.ofEpochMilli(time).toString());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index ce6ea9d..7ddf677 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -45,10 +45,13 @@ import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.exceptions.HelixConflictException;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
@@ -65,15 +68,18 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.zkclient.NetworkUtil;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -1060,4 +1066,68 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     tool.dropCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  /*
+   * Tests 2 APIs: enable and disable cluster pause mode.
+   */
+  @Test
+  public void testEnableDisableClusterPauseMode() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    _gSetupTool.setupTestCluster(clusterName);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+    _gSetupTool.activateCluster(clusterName, controller.getClusterName(), true);
+
+    try {
+      // Should not create pause with pending cancel ST enabled because cancellation is not enabled
+      try {
+        ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+            .withClusterName(clusterName)
+            .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+            .withCancelPendingST(true)
+            .withReason(methodName)
+            .build();
+        _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+        Assert.fail("Should not create pause with pending cancel ST enabled because "
+            + "cancellation is not enabled");
+      } catch (HelixConflictException e) {
+        Assert.assertTrue(e.getMessage().startsWith("State transition cancellation not enabled"));
+      }
+
+      ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+          .withClusterName(clusterName)
+          .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+          .withReason(methodName)
+          .build();
+      _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+      HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+      PauseSignal pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+
+      // Verify pause signal is correctly written
+      Assert.assertNotNull(pauseSignal);
+      Assert.assertTrue(pauseSignal.isClusterPause());
+      Assert.assertFalse(pauseSignal.getCancelPendingST());
+      Assert.assertEquals(pauseSignal.getFromHost(), NetworkUtil.getLocalhostName());
+      Assert.assertEquals(pauseSignal.getReason(), methodName);
+
+      // Disable pause mode
+      request = ClusterManagementModeRequest.newBuilder()
+          .withClusterName(clusterName)
+          .withMode(ClusterManagementMode.Type.NORMAL)
+          .build();
+      _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+      pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+
+      // Verify pause signal has been deleted.
+      Assert.assertNull(pauseSignal);
+    } finally {
+      _gSetupTool.activateCluster(clusterName, controller.getClusterName(), false);
+      controller.syncStop();
+      _gSetupTool.deleteCluster(clusterName);
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 42633c1..2bacc74 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -44,6 +44,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 public class MockHelixAdmin implements HelixAdmin {
@@ -325,6 +326,11 @@ public class MockHelixAdmin implements HelixAdmin {
     return false;
   }
 
+  @Override
+  public void setClusterManagementMode(ClusterManagementModeRequest request) {
+
+  }
+
   @Override public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
 
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
index 39f0a57..18285f5 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
@@ -146,4 +146,12 @@ public class TestLiveInstance extends ZkUnitTestBase {
 
     Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
   }
+
+  @Test
+  public void testLiveInstanceStatus() {
+    LiveInstance testLiveInstance = new LiveInstance("testLiveInstanceStatus");
+    Assert.assertNull(testLiveInstance.getStatus());
+    testLiveInstance.setStatus(LiveInstance.LiveInstanceStatus.PAUSED);
+    Assert.assertEquals(testLiveInstance.getStatus(), LiveInstance.LiveInstanceStatus.PAUSED);
+  }
 }

[helix] 02/06: Add model to record history and status of management mode (#1771)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 7c1baeeabe17b086ce3b50da3873bba6c41ff21b
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Wed Jun 9 14:54:14 2021 -0700

    Add model to record history and status of management mode (#1771)
    
    Management mode operation history needs to be persisted to the controller history znode. The status of IN_PROGRESS or COMPLETED is recorded in the temporary status znode: /{clusterName}/STATUS/CLUSTER/{clusterName}.
    This commit adds data model and methods to record the status and history for management mode.
---
 .../main/java/org/apache/helix/PropertyKey.java    |  12 ++-
 .../java/org/apache/helix/PropertyPathBuilder.java |   7 ++
 .../main/java/org/apache/helix/PropertyType.java   |   1 +
 .../java/org/apache/helix/model/ClusterStatus.java |  78 ++++++++++++++
 .../org/apache/helix/model/ControllerHistory.java  | 112 +++++++++++++++------
 .../org/apache/helix/TestPropertyPathBuilder.java  |   3 +
 .../helix/model/TestControllerHistoryModel.java    |  93 +++++++++++++++++
 7 files changed, 276 insertions(+), 30 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 2cf1168..254cb95 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -25,10 +25,11 @@ import java.util.Objects;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterStatus;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.ExternalView;
@@ -56,10 +57,10 @@ import static org.apache.helix.PropertyType.CONFIGS;
 import static org.apache.helix.PropertyType.CONTROLLER;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
 import static org.apache.helix.PropertyType.CUSTOMIZEDSTATES;
+import static org.apache.helix.PropertyType.CUSTOMIZEDVIEW;
 import static org.apache.helix.PropertyType.ERRORS;
 import static org.apache.helix.PropertyType.ERRORS_CONTROLLER;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
-import static org.apache.helix.PropertyType.CUSTOMIZEDVIEW;
 import static org.apache.helix.PropertyType.HISTORY;
 import static org.apache.helix.PropertyType.IDEALSTATES;
 import static org.apache.helix.PropertyType.INSTANCE_HISTORY;
@@ -241,6 +242,13 @@ public class PropertyKey {
           _clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
     }
 
+    /**
+     * Get a property key associated with this cluster status
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey clusterStatus() {
+      return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName);
+    }
 
     /**
      * Get a property key associated with this Cloud configuration
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 2ba1ebd..34efd29 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.helix.model.ClusterStatus;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.CustomizedView;
@@ -66,6 +67,7 @@ public class PropertyPathBuilder {
     typeToClassMapping.put(PropertyType.HISTORY, ControllerHistory.class);
     typeToClassMapping.put(PropertyType.PAUSE, PauseSignal.class);
     typeToClassMapping.put(PropertyType.MAINTENANCE, MaintenanceSignal.class);
+    typeToClassMapping.put(PropertyType.STATUS, ClusterStatus.class);
     // TODO: Below must handle the case for future versions of Task Framework with a different path
     // structure
     typeToClassMapping.put(PropertyType.WORKFLOWCONTEXT, WorkflowContext.class);
@@ -86,6 +88,7 @@ public class PropertyPathBuilder {
     addEntry(PropertyType.CUSTOMIZEDVIEW, 1, "/{clusterName}/CUSTOMIZEDVIEW");
     addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
     addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
+    addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS");
 
     addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
     addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
@@ -452,4 +455,8 @@ public class PropertyPathBuilder {
   public static String maintenance(String clusterName) {
     return String.format("/%s/CONTROLLER/MAINTENANCE", clusterName);
   }
+
+  public static String clusterStatus(String clusterName) {
+    return String.format("/%s/STATUS/CLUSTER/%s", clusterName, clusterName);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index bedf79e..474ea05 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -48,6 +48,7 @@ public enum PropertyType {
   STATEMODELDEFS(Type.CLUSTER, true, false, false, false, true),
   CONTROLLER(Type.CLUSTER, true, false),
   PROPERTYSTORE(Type.CLUSTER, true, false),
+  STATUS(Type.CLUSTER, true, false, true),
 
   // INSTANCE PROPERTIES
   MESSAGES(Type.INSTANCE, true, true, true),
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
new file mode 100644
index 0000000..6ed354c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java
@@ -0,0 +1,78 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
+
+/**
+ * Represents the cluster status. It can have fields for
+ * {@link ClusterManagementMode} type and status.
+ */
+public class ClusterStatus extends HelixProperty {
+  public ClusterStatus() {
+    super(PropertyType.STATUS.name());
+  }
+
+  public enum ClusterStatusProperty {
+    MANAGEMENT_MODE,
+    MANAGEMENT_MODE_STATUS
+  }
+
+  /**
+   * Sets the type of management mode
+   *
+   * @param mode {@link ClusterManagementMode.Type}
+   */
+  public void setManagementMode(ClusterManagementMode.Type mode) {
+    _record.setEnumField(ClusterStatusProperty.MANAGEMENT_MODE.name(), mode);
+  }
+
+  /**
+   * Gets the type of management mode
+   *
+   * @return {@link ClusterManagementMode.Type}
+   */
+  public ClusterManagementMode.Type getManagementMode() {
+    return _record.getEnumField(ClusterStatusProperty.MANAGEMENT_MODE.name(),
+        ClusterManagementMode.Type.class, null);
+  }
+
+  /**
+   * Sets the cluster management mode status.
+   *
+   * @param status {@link ClusterManagementMode.Status}
+   */
+  public void setManagementModeStatus(ClusterManagementMode.Status status) {
+    _record.setEnumField(ClusterStatusProperty.MANAGEMENT_MODE_STATUS.name(), status);
+  }
+
+  /**
+   * Gets the {@link ClusterManagementMode.Status} of cluster management mode.
+   *
+   * @return {@link ClusterManagementMode.Status} if status is valid; otherwise, return {@code
+   * null}.
+   */
+  public ClusterManagementMode.Status getManagementModeStatus() {
+    return _record.getEnumField(ClusterStatusProperty.MANAGEMENT_MODE_STATUS.name(),
+        ClusterManagementMode.Status.class, null);
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
index cc0da7f..4e418c3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java
@@ -22,7 +22,9 @@ package org.apache.helix.model;
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -30,10 +32,11 @@ import java.util.Map;
 import java.util.TimeZone;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
-
 /**
  * The history of instances that have served as the leader controller
  */
@@ -57,6 +60,12 @@ public class ControllerHistory extends HelixProperty {
 
   }
 
+  private enum ManagementModeConfigKey {
+    MANAGEMENT_MODE_HISTORY,
+    MODE,
+    STATUS
+  }
+
   private enum OperationType {
     // The following are options for OPERATION_TYPE in MaintenanceConfigKey
     ENTER,
@@ -65,7 +74,8 @@ public class ControllerHistory extends HelixProperty {
 
   public enum HistoryType {
     CONTROLLER_LEADERSHIP,
-    MAINTENANCE
+    MAINTENANCE,
+    MANAGEMENT_MODE
   }
 
   public ControllerHistory(String id) {
@@ -96,17 +106,6 @@ public class ControllerHistory extends HelixProperty {
     list.add(instanceName);
     // TODO: remove above in future when we confirmed no one consumes it */
 
-    List<String> historyList = _record.getListField(ConfigProperty.HISTORY.name());
-    if (historyList == null) {
-      historyList = new ArrayList<>();
-      _record.setListField(ConfigProperty.HISTORY.name(), historyList);
-    }
-
-    // Keep only the last HISTORY_SIZE entries
-    while (historyList.size() >= HISTORY_SIZE) {
-      historyList.remove(0);
-    }
-
     Map<String, String> historyEntry = new HashMap<>();
 
     long currentTime = System.currentTimeMillis();
@@ -119,8 +118,7 @@ public class ControllerHistory extends HelixProperty {
     historyEntry.put(ConfigProperty.DATE.name(), dateTime);
     historyEntry.put(ConfigProperty.VERSION.name(), version);
 
-    historyList.add(historyEntry.toString());
-    return _record;
+    return populateHistoryEntries(HistoryType.CONTROLLER_LEADERSHIP, historyEntry.toString());
   }
 
   /**
@@ -137,6 +135,40 @@ public class ControllerHistory extends HelixProperty {
   }
 
   /**
+   * Gets the management mode history.
+   *
+   * @return List of history strings.
+   */
+  public List<String> getManagementModeHistory() {
+    List<String> history =
+        _record.getListField(ManagementModeConfigKey.MANAGEMENT_MODE_HISTORY.name());
+    return history == null ? Collections.emptyList() : history;
+  }
+
+  /**
+   * Updates management mode and status history to controller history in FIFO order.
+   *
+   * @param controller controller name
+   * @param mode cluster management mode {@link ClusterManagementMode}
+   * @param fromHost the hostname that creates the management mode signal
+   * @param time time in millis
+   * @param reason reason to put the cluster in management mode
+   * @return updated history znrecord
+   */
+  public ZNRecord updateManagementModeHistory(String controller, ClusterManagementMode mode,
+      String fromHost, long time, String reason) {
+    Map<String, String> historyEntry = new HashMap<>();
+    historyEntry.put(ConfigProperty.CONTROLLER.name(), controller);
+    historyEntry.put(ConfigProperty.TIME.name(), Instant.ofEpochMilli(time).toString());
+    historyEntry.put(ManagementModeConfigKey.MODE.name(), mode.getMode().name());
+    historyEntry.put(ManagementModeConfigKey.STATUS.name(), mode.getStatus().name());
+    historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
+    historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+
+    return populateHistoryEntries(HistoryType.MANAGEMENT_MODE, historyEntry.toString());
+  }
+
+  /**
    * Record up to MAINTENANCE_HISTORY_SIZE number of changes to MaintenanceSignal in FIFO order.
    * @param enabled
    * @param reason
@@ -148,18 +180,6 @@ public class ControllerHistory extends HelixProperty {
   public ZNRecord updateMaintenanceHistory(boolean enabled, String reason, long currentTime,
       MaintenanceSignal.AutoTriggerReason internalReason, Map<String, String> customFields,
       MaintenanceSignal.TriggeringEntity triggeringEntity) throws IOException {
-    List<String> maintenanceHistoryList =
-        _record.getListField(MaintenanceConfigKey.MAINTENANCE_HISTORY.name());
-    if (maintenanceHistoryList == null) {
-      maintenanceHistoryList = new ArrayList<>();
-      _record.setListField(MaintenanceConfigKey.MAINTENANCE_HISTORY.name(), maintenanceHistoryList);
-    }
-
-    // Keep only the last MAINTENANCE_HISTORY_SIZE entries
-    while (maintenanceHistoryList.size() >= MAINTENANCE_HISTORY_SIZE) {
-      maintenanceHistoryList.remove(0);
-    }
-
     DateFormat df = new SimpleDateFormat("yyyy-MM-dd-HH:" + "mm:ss");
     df.setTimeZone(TimeZone.getTimeZone("UTC"));
     String dateTime = df.format(new Date(currentTime));
@@ -189,7 +209,43 @@ public class ControllerHistory extends HelixProperty {
         }
       }
     }
-    maintenanceHistoryList.add(new ObjectMapper().writeValueAsString(maintenanceEntry));
+
+    return populateHistoryEntries(HistoryType.MAINTENANCE,
+        new ObjectMapper().writeValueAsString(maintenanceEntry));
+  }
+
+  private ZNRecord populateHistoryEntries(HistoryType type, String entry) {
+    String configKey;
+    int historySize;
+    switch (type) {
+      case CONTROLLER_LEADERSHIP:
+        configKey = ConfigProperty.HISTORY.name();
+        historySize = HISTORY_SIZE;
+        break;
+      case MAINTENANCE:
+        configKey = MaintenanceConfigKey.MAINTENANCE_HISTORY.name();
+        historySize = MAINTENANCE_HISTORY_SIZE;
+        break;
+      case MANAGEMENT_MODE:
+        configKey = ManagementModeConfigKey.MANAGEMENT_MODE_HISTORY.name();
+        historySize = HISTORY_SIZE;
+        break;
+      default:
+        throw new HelixException("Unknown history type " + type.name());
+    }
+
+    List<String> historyList = _record.getListField(configKey);
+    if (historyList == null) {
+      historyList = new ArrayList<>();
+      _record.setListField(configKey, historyList);
+    }
+
+    while (historyList.size() >= historySize) {
+      historyList.remove(0);
+    }
+
+    historyList.add(entry);
+
     return _record;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
index 422fb9c..9212568 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
@@ -19,6 +19,7 @@ package org.apache.helix;
  * under the License.
  */
 
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -56,5 +57,7 @@ public class TestPropertyPathBuilder {
     actual = PropertyPathBuilder.controllerMessage("test_cluster");
     AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER/MESSAGES");
 
+    actual = PropertyPathBuilder.clusterStatus("test_cluster");
+    Assert.assertEquals(actual, "/test_cluster/STATUS/CLUSTER/test_cluster");
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestControllerHistoryModel.java b/helix-core/src/test/java/org/apache/helix/model/TestControllerHistoryModel.java
new file mode 100644
index 0000000..2e0bab3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestControllerHistoryModel.java
@@ -0,0 +1,93 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Splitter;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.zookeeper.zkclient.NetworkUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestControllerHistoryModel {
+  @Test
+  public void testManagementModeHistory() {
+    ControllerHistory controllerHistory = new ControllerHistory("HISTORY");
+    String controller = "controller-0";
+    ClusterManagementMode mode = new ClusterManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE,
+        ClusterManagementMode.Status.COMPLETED);
+    long time = System.currentTimeMillis();
+    String fromHost = NetworkUtil.getLocalhostName();
+    String reason = TestHelper.getTestMethodName();
+    controllerHistory.updateManagementModeHistory(controller, mode, fromHost, time, reason);
+
+    List<String> historyList = controllerHistory.getManagementModeHistory();
+    String lastHistory = historyList.get(historyList.size() - 1);
+    Map<String, String> historyMap = stringToMap(lastHistory);
+
+    Map<String, String> expectedMap = new HashMap<>();
+    expectedMap.put("CONTROLLER", controller);
+    expectedMap.put("TIME", Instant.ofEpochMilli(time).toString());
+    expectedMap.put("MODE", mode.getMode().name());
+    expectedMap.put("STATUS", mode.getStatus().name());
+    expectedMap.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost);
+    expectedMap.put(PauseSignal.PauseSignalProperty.REASON.name(), reason);
+
+    Assert.assertEquals(historyMap, expectedMap);
+
+    // Add more than 10 entries, it should only keep the latest 10.
+    List<String> reasonList = new ArrayList<>();
+    for (int i = 0; i < 15; i++) {
+      String reasonI = reason + "-" + i;
+      controllerHistory.updateManagementModeHistory(controller, mode, fromHost, time, reasonI);
+      reasonList.add(reasonI);
+    }
+
+    historyList = controllerHistory.getManagementModeHistory();
+
+    Assert.assertEquals(historyList.size(), 10);
+
+    // Assert the history is the latest 10 entries.
+    int i = 5;
+    for (String entry : historyList) {
+      Map<String, String> actual = stringToMap(entry);
+      Assert.assertEquals(actual.get(PauseSignal.PauseSignalProperty.REASON.name()),
+          reasonList.get(i++));
+    }
+  }
+
+  /**
+   * Performs conversion from a map string into a map. The string was converted by map's toString().
+   *
+   * @param mapAsString A string that is converted by map's toString() method.
+   *                    Example: "{k1=v1, k2=v2}"
+   * @return Map<String, String>
+   */
+  private static Map<String, String> stringToMap(String mapAsString) {
+    return Splitter.on(", ").withKeyValueSeparator('=')
+        .split(mapAsString.substring(1, mapAsString.length() - 1));
+  }
+}

[helix] 05/06: Add message util to create messages (#1796)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 25117d9d0b98b048d7955655c47fa89837bbe361
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Mon Jun 14 21:32:25 2021 -0700

    Add message util to create messages (#1796)
    
    Message creation methods are private in message generation phase. Management mode stage will also need message generation methods to create ST cancellation and participant status change messages.
    Message util will help with the purposes.
    
    This commit moves the common message creation logic to a message util so multiple stages can reuse the code.
---
 .../controller/stages/MessageGenerationPhase.java  |  97 +++-----------
 .../java/org/apache/helix/util/MessageUtil.java    | 139 +++++++++++++++++++++
 .../TestStateTransitionAppFailureHandling.java     |   5 +-
 3 files changed, 157 insertions(+), 84 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 836e5df..38d9908 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
@@ -45,13 +44,12 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,8 +68,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
       .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
   private final static String PENDING_MESSAGE = "pending message";
   private final static String STALE_MESSAGE = "stale message";
-  // TODO: Make the message retry count configurable through the Cluster Config or IdealStates.
-  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
 
   private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
 
@@ -215,10 +211,12 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) {
           if (shouldCreateSTCancellation(pendingMessage, desiredState,
               stateModelDef.getInitialState())) {
-            message = createStateTransitionCancellationMessage(manager, resource,
-                partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
-                stateModelDef.getId(), pendingMessage.getFromState(), pendingMessage.getToState(),
-                null, cancellationMessage, isCancellationEnabled, currentState);
+            message = MessageUtil
+                .createStateTransitionCancellationMessage(manager.getInstanceName(),
+                    manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
+                    sessionIdMap.get(instanceName), stateModelDef.getId(),
+                    pendingMessage.getFromState(), pendingMessage.getToState(), null,
+                    cancellationMessage, isCancellationEnabled, currentState);
           }
         } else {
           if (nextState == null) {
@@ -236,9 +234,10 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                     cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
-            message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
-                instanceName, currentState, nextState, sessionIdMap.get(instanceName),
-                stateModelDef.getId());
+            message = MessageUtil
+                .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
+                    resource, partition.getPartitionName(), instanceName, currentState, nextState,
+                    sessionIdMap.get(instanceName), stateModelDef.getId());
 
             if (logger.isDebugEnabled()) {
               LogUtil.logDebug(logger, _eventId, String.format(
@@ -331,10 +330,10 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                 + instanceName + ", pendingState: " + pendingState + ", currentState: "
                 + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
 
-        message = createStateTransitionCancellationMessage(manager, resource,
-            partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
-            stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
-            cancellationMessage, isCancellationEnabled, currentState);
+        message = MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
+            manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
+            sessionIdMap.get(instanceName), stateModelDef.getId(), pendingMessage.getFromState(),
+            pendingState, nextState, cancellationMessage, isCancellationEnabled, currentState);
       }
     }
     return message;
@@ -417,72 +416,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     }
   }
 
-  private Message createStateTransitionMessage(HelixManager manager, Resource resource,
-      String partitionName, String instanceName, String currentState, String nextState,
-      String sessionId, String stateModelDefName) {
-    String uuid = UUID.randomUUID().toString();
-    String managerSessionId = manager.getSessionId();
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionName(partitionName);
-    message.setResourceName(resource.getResourceName());
-    message.setFromState(currentState);
-    message.setToState(nextState);
-    message.setTgtSessionId(sessionId);
-    message.setSrcSessionId(managerSessionId);
-    message.setExpectedSessionId(managerSessionId);
-    message.setStateModelDef(stateModelDefName);
-    message.setStateModelFactoryName(resource.getStateModelFactoryname());
-    message.setBucketSize(resource.getBucketSize());
-    // Set the retry count for state transition messages.
-    // TODO: make the retry count configurable in ClusterConfig or IdealState
-    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
-
-    if (resource.getResourceGroupName() != null) {
-      message.setResourceGroupName(resource.getResourceGroupName());
-    }
-    if (resource.getResourceTag() != null) {
-      message.setResourceTag(resource.getResourceTag());
-    }
-
-    return message;
-  }
-
-  private Message createStateTransitionCancellationMessage(HelixManager manager, Resource resource,
-      String partitionName, String instanceName, String sessionId, String stateModelDefName,
-      String fromState, String toState, String nextState, Message cancellationMessage,
-      boolean isCancellationEnabled, String currentState) {
-
-    if (isCancellationEnabled && cancellationMessage == null) {
-      logger.info("Event {} : Send cancellation message of the state transition for {}.{} on {}, "
-              + "currentState: {}, nextState: {},  toState: {}",
-          _eventId, resource.getResourceName(), partitionName, instanceName,
-          currentState, nextState == null ? "N/A" : nextState, toState);
-
-      String uuid = UUID.randomUUID().toString();
-      String managerSessionId = manager.getSessionId();
-      Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, uuid);
-      message.setSrcName(manager.getInstanceName());
-      message.setTgtName(instanceName);
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionName(partitionName);
-      message.setResourceName(resource.getResourceName());
-      message.setFromState(fromState);
-      message.setToState(toState);
-      message.setTgtSessionId(sessionId);
-      message.setSrcSessionId(managerSessionId);
-      message.setExpectedSessionId(managerSessionId);
-      message.setStateModelDef(stateModelDefName);
-      message.setStateModelFactoryName(resource.getStateModelFactoryname());
-      message.setBucketSize(resource.getBucketSize());
-      return message;
-    }
-
-    return null;
-  }
-
   private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
       String currentState, String nextState, IdealState idealState, Partition partition) {
     StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
new file mode 100644
index 0000000..94de833
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
@@ -0,0 +1,139 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.UUID;
+
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Message utils to operate on message such creating messages.
+ */
+public class MessageUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(MessageUtil.class);
+
+  // TODO: Make the message retry count configurable through the Cluster Config or IdealStates.
+  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
+
+  public static Message createStateTransitionCancellationMessage(String srcInstanceName,
+      String srcSessionId, Resource resource, String partitionName, String instanceName,
+      String sessionId, String stateModelDefName, String fromState, String toState,
+      String nextState, Message cancellationMessage, boolean isCancellationEnabled,
+      String currentState) {
+    if (isCancellationEnabled && cancellationMessage == null) {
+      LOG.info("Create cancellation message of the state transition for {}.{} on {}, "
+              + "currentState: {}, nextState: {},  toState: {}", resource.getResourceName(),
+          partitionName, instanceName, currentState, nextState == null ? "N/A" : nextState,
+          toState);
+
+      Message message =
+          createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
+              srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
+              nextState, sessionId, stateModelDefName);
+
+      message.setFromState(fromState);
+      message.setToState(toState);
+      return message;
+    }
+
+    return null;
+  }
+
+  public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
+      Resource resource, String partitionName, String instanceName, String currentState,
+      String nextState, String tgtSessionId, String stateModelDefName) {
+    Message message =
+        createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
+            srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
+            stateModelDefName);
+
+    // Set the retry count for state transition messages.
+    // TODO: make the retry count configurable in ClusterConfig or IdealState
+    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+
+    if (resource.getResourceGroupName() != null) {
+      message.setResourceGroupName(resource.getResourceGroupName());
+    }
+    if (resource.getResourceTag() != null) {
+      message.setResourceTag(resource.getResourceTag());
+    }
+
+    return message;
+  }
+
+  /**
+   * Creates a message to change participant status
+   * {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
+   *
+   * @param currentState current status of the live instance
+   * @param nextState next status that will be changed to
+   * @param srcInstanceName source instance name
+   * @param srcSessionId session id for the source instance
+   * @param tgtInstanceName target instance name
+   * @param tgtSessionId target instance session id
+   * @return participant status change message
+   */
+  public static Message createStatusChangeMessage(LiveInstance.LiveInstanceStatus currentState,
+      LiveInstance.LiveInstanceStatus nextState, String srcInstanceName, String srcSessionId,
+      String tgtInstanceName, String tgtSessionId) {
+    return createBasicMessage(Message.MessageType.PARTICIPANT_STATUS_CHANGE, srcInstanceName,
+        srcSessionId, tgtInstanceName, tgtSessionId, currentState.name(), nextState.name());
+  }
+
+  /* Creates a message that that has the least required fields. */
+  private static Message createBasicMessage(Message.MessageType messageType, String srcInstanceName,
+      String srcSessionId, String tgtInstanceName, String tgtSessionId, String currentState,
+      String nextState) {
+    String uuid = UUID.randomUUID().toString();
+
+    Message message = new Message(messageType, uuid);
+    message.setSrcName(srcInstanceName);
+    message.setTgtName(tgtInstanceName);
+    message.setMsgState(Message.MessageState.NEW);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(tgtSessionId);
+    message.setSrcSessionId(srcSessionId);
+    message.setExpectedSessionId(srcSessionId);
+
+    return message;
+  }
+
+  /* Creates state transition or state transition cancellation message */
+  private static Message createStateTransitionMessage(Message.MessageType messageType,
+      String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
+      String instanceName, String currentState, String nextState, String tgtSessionId,
+      String stateModelDefName) {
+    Message message =
+        createBasicMessage(messageType, srcInstanceName, srcSessionId, instanceName, tgtSessionId,
+            currentState, nextState);
+    message.setPartitionName(partitionName);
+    message.setStateModelDef(stateModelDefName);
+    message.setResourceName(resource.getResourceName());
+    message.setStateModelFactoryName(resource.getStateModelFactoryname());
+    message.setBucketSize(resource.getBucketSize());
+
+    return message;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
index 12ede56..e97424c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
@@ -37,6 +37,7 @@ import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -122,7 +123,7 @@ public class TestStateTransitionAppFailureHandling extends ZkStandAloneCMTestBas
       // Check if the factory has tried enough times before fail the message.
       Assert.assertEquals(retryCountUntilSucceed - retryFactoryMap.get(instanceName)
           .getRemainingRetryCountUntilSucceed(), instanceMessages.size()
-          * MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+          * MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
     }
 
     // Verify that the partition is not initialized.
@@ -146,7 +147,7 @@ public class TestStateTransitionAppFailureHandling extends ZkStandAloneCMTestBas
     // Make the mock StateModelFactory return handler before last retry. So it will successfully
     // finish handler initialization.
     int retryCountUntilSucceed =
-        MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 1;
+        MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 1;
     Map<String, RetryStateModelFactory> retryFactoryMap = resetParticipants(retryCountUntilSucceed);
 
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);

[helix] 04/06: Move pause and maintenance handling out of controller (#1793)

Posted by hz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a6d8d19170c24ad7c68dd9caf04a8ac489f3365a
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Sat Jun 12 01:00:25 2021 -0700

    Move pause and maintenance handling out of controller (#1793)
    
    With management mode pipeline, the pause and maintenance signals handling logic should be moved out of the onControllerChange() and moved to the management mode pipeline.
    
    This commit handles pause/maintenance signals enable/disable and update cluster status accordingly.
---
 .../helix/controller/GenericHelixController.java   | 80 +++-------------------
 .../dataproviders/BaseControllerDataProvider.java  | 21 +++++-
 .../ManagementControllerDataProvider.java          | 24 ++++++-
 .../controller/stages/ManagementModeStage.java     | 16 +++--
 .../controller/stages/ResourceValidationStage.java | 29 ++++++--
 .../main/java/org/apache/helix/model/Message.java  |  5 ++
 .../BestPossibleExternalViewVerifier.java          |  2 +-
 .../main/java/org/apache/helix/util/HelixUtil.java | 35 ++++++++--
 .../java/org/apache/helix/util/RebalanceUtil.java  |  4 +-
 9 files changed, 119 insertions(+), 97 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7da6ef0..6cc1e5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -103,9 +103,7 @@ import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -177,13 +175,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   private long _continuousTaskRebalanceFailureCount = 0;
 
   /**
-   * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
-   * will be no-op. Other event handling logic keeps the same when the flag is set.
-   */
-  private boolean _paused;
-  private boolean _inMaintenanceMode;
-
-  /**
    * The executors that can periodically run the rebalancing pipeline. A
    * SingleThreadScheduledExecutor will start if there is resource group that has the config to do
    * periodically rebalance.
@@ -837,18 +828,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     // have this instanceof clauses
     List<Pipeline> pipelines;
     boolean isTaskFrameworkPipeline = false;
-    Pipeline.Type pipelineType;
+    boolean isManagementPipeline = false;
 
     if (dataProvider instanceof ResourceControllerDataProvider) {
       pipelines = _registry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.DEFAULT;
     } else if (dataProvider instanceof WorkflowControllerDataProvider) {
       pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
       isTaskFrameworkPipeline = true;
-      pipelineType = Pipeline.Type.TASK;
     } else if (dataProvider instanceof ManagementControllerDataProvider) {
       pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+      isManagementPipeline = true;
     } else {
       logger.warn(String
           .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
@@ -857,11 +846,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
 
     // Should not run management mode and default/task pipelines at the same time.
-    if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
-        || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+    if (_inManagementMode != isManagementPipeline) {
       logger.info("Should not run management mode and default/task pipelines at the same time. "
-              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
-          manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
+              + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
+          manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
       return;
     }
 
@@ -881,6 +869,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
         }
         if (_isMonitoring) {
+          _clusterStatusMonitor.setEnabled(!_inManagementMode);
+          _clusterStatusMonitor.setPaused(_inManagementMode);
           event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
         }
       }
@@ -1335,25 +1325,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
 
     if (controllerIsLeader) {
-      HelixManager manager = changeContext.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-      MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
-      boolean prevPaused = _paused;
-      boolean prevInMaintenanceMode = _inMaintenanceMode;
-      _paused = updateControllerState(pauseSignal, _paused);
-      _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
-      // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
-      if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
-        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
-      }
-
       enableClusterStatusMonitor(true);
-      _clusterStatusMonitor.setEnabled(!_paused);
-      _clusterStatusMonitor.setPaused(_paused);
-      _clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
+      pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
     } else {
       enableClusterStatusMonitor(false);
       // Note that onControllerChange is executed in parallel with the event processing thread. It
@@ -1543,43 +1516,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
   }
 
-  private boolean updateControllerState(PauseSignal signal, boolean statusFlag) {
-    if (signal != null) {
-      if (!statusFlag) {
-        statusFlag = true;
-        // This log is recorded for the first time entering PAUSE/MAINTENANCE mode
-        logger.info(String.format("controller is now %s",
-            (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
-      }
-    } else {
-      statusFlag = false;
-    }
-    return statusFlag;
-  }
-
-  /**
-   * Trigger a Resume Event if the cluster is back to activated.
-   * @param changeContext
-   * @param prevPaused the previous paused status.
-   * @param prevInMaintenanceMode the previous in maintenance mode status.
-   */
-  private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
-      boolean prevInMaintenanceMode) {
-    /**
-     * WARNING: the logic here is tricky.
-     * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still
-     * paused, the resume event should not be sent.
-     * 2. Only send resume event if the status is changed back to active. So we don't send multiple
-     * event unnecessarily.
-     */
-    if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
-      pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
-      logger.info("controller is now resumed from paused/maintenance state");
-      return true;
-    }
-    return false;
-  }
-
   // TODO: refactor this to use common/ClusterEventProcessor.
   @Deprecated
   private class ClusterEventProcessor extends Thread {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6ce25e2..3c705f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -57,6 +57,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
@@ -87,6 +88,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   private boolean _updateInstanceOfflineTime = true;
   private MaintenanceSignal _maintenanceSignal;
+  private PauseSignal _pauseSignal;
   private boolean _isMaintenanceModeEnabled;
   private boolean _hasMaintenanceSignalChanged;
   private ExecutorService _asyncTasksThreadPool;
@@ -300,8 +302,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     }
   }
 
-  private void updateMaintenanceInfo(final HelixDataAccessor accessor) {
+  private void refreshManagementSignals(final HelixDataAccessor accessor) {
     _maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance());
+    _pauseSignal = accessor.getProperty(accessor.keyBuilder().pause());
     _isMaintenanceModeEnabled = _maintenanceSignal != null;
     // The following flag is to guarantee that there's only one update per pineline run because we
     // check for whether maintenance recovery could happen twice every pipeline
@@ -373,7 +376,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
-    updateMaintenanceInfo(accessor);
+    refreshManagementSignals(accessor);
     timeoutNodesDuringMaintenance(accessor, _clusterConfig, _isMaintenanceModeEnabled);
 
     // TODO: once controller gets split, only one controller should update offline instance history
@@ -622,6 +625,16 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   /**
+   * Gets all messages for each instance.
+   *
+   * @return Map of {instanceName -> Collection of Message}.
+   */
+  public Map<String, Collection<Message>> getAllInstancesMessages() {
+    return getAllInstances().stream().collect(
+        Collectors.toMap(instance -> instance, instance -> getMessages(instance).values()));
+  }
+
+  /**
    * This function is supposed to be only used by testing purpose for safety. For "get" usage,
    * please use getStaleMessagesByInstance.
    */
@@ -968,6 +981,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     return _maintenanceSignal;
   }
 
+  public PauseSignal getPauseSignal() {
+    return _pauseSignal;
+  }
+
   protected StringBuilder genCacheContentStringBuilder() {
     StringBuilder sb = new StringBuilder();
     sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index d178ca5..fe940ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -19,9 +19,27 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+
+/**
+ * Data provider for controller management mode pipeline.
+ */
 public class ManagementControllerDataProvider extends BaseControllerDataProvider {
-  // TODO: implement this class to only refresh required event types
-  public ManagementControllerDataProvider(String clusterName, String name) {
-    super(clusterName, name);
+  // Only these types of properties are refreshed for the full refresh request.
+  private static final List<HelixConstants.ChangeType> FULL_REFRESH_PROPERTIES =
+      Arrays.asList(HelixConstants.ChangeType.LIVE_INSTANCE, HelixConstants.ChangeType.MESSAGE);
+
+  public ManagementControllerDataProvider(String clusterName, String pipelineName) {
+    super(clusterName, pipelineName);
+  }
+
+  @Override
+  public void requireFullRefresh() {
+    for (HelixConstants.ChangeType type : FULL_REFRESH_PROPERTIES) {
+      _propertyDataChangedMap.get(type).set(true);
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 512224d..042aa14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -36,13 +36,21 @@ public class ManagementModeStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     // TODO: implement the stage
+    _eventId = event.getEventId();
     String clusterName = event.getClusterName();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
-    if (!HelixUtil.inManagementMode(cache)) {
-      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+
+    // TODO: move to the last stage of management pipeline
+    checkInManagementMode(clusterName, cache);
+  }
+
+  private void checkInManagementMode(String clusterName, ManagementControllerDataProvider cache) {
+    // Should exit management mode
+    if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName);
       RebalanceUtil.enableManagementMode(clusterName, false);
-      throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index a4d4783..613ce2e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -44,12 +45,7 @@ public class ResourceValidationStage extends AbstractBaseStage {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
 
-    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
-    if (HelixUtil.inManagementMode(cache)) {
-      // Trigger an immediate management mode pipeline.
-      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
-      throw new StageException("Pipeline should not be run because cluster is in management mode");
-    }
+    processManagementMode(event, cache);
 
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
@@ -91,6 +87,27 @@ public class ResourceValidationStage extends AbstractBaseStage {
     }
   }
 
+  private void processManagementMode(ClusterEvent event, BaseControllerDataProvider cache)
+      throws StageException {
+    // Set cluster status monitor for maintenance mode
+    ClusterStatusMonitor monitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    if (monitor != null) {
+      monitor.setMaintenance(cache.isMaintenanceModeEnabled());
+    }
+
+    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+    if (HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      // Trigger an immediate management mode pipeline.
+      LogUtil.logInfo(LOG, _eventId,
+          "Enabling management mode pipeline for cluster " + event.getClusterName());
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException(
+          "Pipeline should not be run because cluster " + event.getClusterName()
+              + "is in management mode");
+    }
+  }
+
   /**
    * Check if the ideal state adheres to a rule
    * @param idealState the ideal state to check
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b509506..6222df5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -59,6 +59,7 @@ public class Message extends HelixProperty {
     NO_OP,
     PARTICIPANT_ERROR_REPORT,
     PARTICIPANT_SESSION_CHANGE,
+    PARTICIPANT_STATUS_CHANGE,
     CHAINED_MESSAGE, // this is a message subtype
     RELAYED_MESSAGE
   }
@@ -927,6 +928,10 @@ public class Message extends HelixProperty {
     return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
   }
 
+  public boolean isParticipantStatusChangeType() {
+    return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType());
+  }
+
   /**
    * Get the {@link PropertyKey} for this message
    * @param keyBuilder PropertyKey Builder
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index eaab1a2..3b133a1 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -416,7 +416,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
    */
   private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache, Set<String> resources)
       throws Exception {
-    ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
+    ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.StateVerifier);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
 
     RebalanceUtil.runStage(event, new ResourceComputationStage());
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3151716..c770583 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -21,6 +21,7 @@ package org.apache.helix.util;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,7 +39,6 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -60,6 +60,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -550,13 +551,33 @@ public final class HelixUtil {
   }
 
   /**
-   * Checks whether or not the cluster is in management mode.
+   * Checks whether or not the cluster is in management mode. It checks:
+   * - pause signal
+   * - live instances: whether any live instance is not in normal status, eg. frozen.
+   * - messages: whether live instance has a participant status change message
    *
-   * @param cache
-   * @return
+   * @param pauseSignal pause signal
+   * @param liveInstanceMap map of live instances
+   * @param enabledLiveInstances set of enabled live instance names. They should be all included
+   *                             in the liveInstanceMap.
+   * @param instancesMessages a map of all instances' messages.
+   * @return true if cluster is in management mode; otherwise, false
    */
-  public static boolean inManagementMode(BaseControllerDataProvider cache) {
-    // TODO: implement the logic. Parameters can also change
-    return true;
+  public static boolean inManagementMode(PauseSignal pauseSignal,
+      Map<String, LiveInstance> liveInstanceMap, Set<String> enabledLiveInstances,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check pause signal and abnormal live instances (eg. in freeze mode)
+    // TODO: should check maintenance signal when moving maintenance to management pipeline
+    return pauseSignal != null || enabledLiveInstances.stream().anyMatch(
+        instance -> isInstanceInManagementMode(instance, liveInstanceMap, instancesMessages));
+  }
+
+  private static boolean isInstanceInManagementMode(String instance,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check live instance status and participant status change message
+    return LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+        || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
+        .anyMatch(Message::isParticipantStatusChangeType));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index db2b76f..f74b98f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,8 +159,8 @@ public class RebalanceUtil {
           enabled);
       leaderController.setInManagementMode(enabled);
     } else {
-      LOG.error("Failed to switch management mode pipeline, enabled={}. "
-          + "Controller for cluster {} does not exist", clusterName, enabled);
+      throw new HelixException(String.format("Failed to switch management mode pipeline, "
+          + "enabled=%s. Controller for cluster %s does not exist", enabled, clusterName));
     }
 
     // Triggers an event to immediately run the pipeline