You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:20:01 UTC
[53/53] [abbrv] git commit: Merge branch 'helix-logical-model'
Merge branch 'helix-logical-model'
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/18a8c7cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/18a8c7cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/18a8c7cf
Branch: refs/heads/master
Commit: 18a8c7cfc0825a4ff8ef963734bfffb462042cbf
Parents: 13c1c4c 0839afa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Nov 6 17:18:05 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 17:18:05 2013 -0800
----------------------------------------------------------------------
.../resources/SchedulerTasksResource.java | 16 +-
.../helix/tools/TestResetPartitionState.java | 11 +-
.../org/apache/helix/agent/AgentStateModel.java | 24 +-
.../helix/agent/AgentStateModelFactory.java | 7 +-
.../org/apache/helix/agent/HelixAgentMain.java | 4 +-
.../helix/agent/ProcessMonitorThread.java | 1 -
.../org/apache/helix/agent/TestHelixAgent.java | 2 -
.../java/org/apache/helix/ConfigAccessor.java | 4 +-
.../main/java/org/apache/helix/HelixAdmin.java | 9 +-
.../org/apache/helix/HelixAutoController.java | 43 +
.../java/org/apache/helix/HelixConnection.java | 253 ++++
.../helix/HelixConnectionStateListener.java | 13 +
.../java/org/apache/helix/HelixController.java | 18 +
.../org/apache/helix/HelixDataAccessor.java | 2 +-
.../java/org/apache/helix/HelixManager.java | 6 +-
.../java/org/apache/helix/HelixParticipant.java | 37 +
.../java/org/apache/helix/HelixProperty.java | 10 +
.../main/java/org/apache/helix/HelixRole.java | 40 +
.../java/org/apache/helix/HelixService.java | 16 +
.../main/java/org/apache/helix/PropertyKey.java | 113 +-
.../org/apache/helix/PropertyPathConfig.java | 8 +
.../java/org/apache/helix/PropertyType.java | 2 +
.../main/java/org/apache/helix/ZNRecord.java | 9 +
.../org/apache/helix/alerts/AlertProcessor.java | 3 -
.../org/apache/helix/alerts/AlertsHolder.java | 3 -
.../apache/helix/alerts/DecayAggregator.java | 2 -
.../apache/helix/alerts/ExpressionParser.java | 2 -
.../org/apache/helix/alerts/StatsHolder.java | 4 -
.../java/org/apache/helix/alerts/Tuple.java | 3 -
.../apache/helix/alerts/WindowAggregator.java | 2 -
.../main/java/org/apache/helix/api/Cluster.java | 307 +++++
.../java/org/apache/helix/api/Controller.java | 74 ++
.../java/org/apache/helix/api/HelixVersion.java | 68 +
.../java/org/apache/helix/api/Participant.java | 174 +++
.../java/org/apache/helix/api/Partition.java | 54 +
.../java/org/apache/helix/api/Resource.java | 212 ++++
.../org/apache/helix/api/RunningInstance.java | 69 +
.../main/java/org/apache/helix/api/Scope.java | 125 ++
.../java/org/apache/helix/api/Spectator.java | 45 +
.../main/java/org/apache/helix/api/State.java | 85 ++
.../api/accessor/AtomicClusterAccessor.java | 260 ++++
.../api/accessor/AtomicParticipantAccessor.java | 211 ++++
.../api/accessor/AtomicResourceAccessor.java | 150 +++
.../helix/api/accessor/ClusterAccessor.java | 836 +++++++++++++
.../helix/api/accessor/ControllerAccessor.java | 49 +
.../helix/api/accessor/ParticipantAccessor.java | 774 ++++++++++++
.../helix/api/accessor/ResourceAccessor.java | 470 +++++++
.../apache/helix/api/config/ClusterConfig.java | 912 ++++++++++++++
.../helix/api/config/NamespacedConfig.java | 228 ++++
.../helix/api/config/ParticipantConfig.java | 382 ++++++
.../apache/helix/api/config/ResourceConfig.java | 373 ++++++
.../helix/api/config/SchedulerTaskConfig.java | 69 +
.../org/apache/helix/api/config/UserConfig.java | 53 +
.../java/org/apache/helix/api/id/ClusterId.java | 57 +
.../org/apache/helix/api/id/ConstraintId.java | 80 ++
.../org/apache/helix/api/id/ControllerId.java | 54 +
.../main/java/org/apache/helix/api/id/Id.java | 55 +
.../java/org/apache/helix/api/id/MessageId.java | 54 +
.../org/apache/helix/api/id/ParticipantId.java | 54 +
.../org/apache/helix/api/id/PartitionId.java | 112 ++
.../java/org/apache/helix/api/id/ProcId.java | 54 +
.../org/apache/helix/api/id/ResourceId.java | 57 +
.../java/org/apache/helix/api/id/SessionId.java | 54 +
.../org/apache/helix/api/id/SpectatorId.java | 51 +
.../apache/helix/api/id/StateModelDefId.java | 66 +
.../helix/api/id/StateModelFactoryId.java | 57 +
.../helix/controller/ExternalViewGenerator.java | 2 +-
.../controller/GenericHelixController.java | 29 +-
.../controller/rebalancer/AutoRebalancer.java | 203 ---
.../controller/rebalancer/CustomRebalancer.java | 136 +-
.../rebalancer/FullAutoRebalancer.java | 211 ++++
.../controller/rebalancer/HelixRebalancer.java | 64 +
.../helix/controller/rebalancer/Rebalancer.java | 9 +-
.../controller/rebalancer/RebalancerRef.java | 94 ++
.../rebalancer/SemiAutoRebalancer.java | 89 +-
.../context/BasicRebalancerContext.java | 240 ++++
.../rebalancer/context/ContextSerializer.java | 37 +
.../context/CustomRebalancerContext.java | 163 +++
.../context/DefaultContextSerializer.java | 83 ++
.../context/FullAutoRebalancerContext.java | 63 +
.../context/PartitionedRebalancerContext.java | 373 ++++++
.../rebalancer/context/RebalancerConfig.java | 182 +++
.../rebalancer/context/RebalancerContext.java | 94 ++
.../context/ReplicatedRebalancerContext.java | 40 +
.../context/SemiAutoRebalancerContext.java | 178 +++
.../util/ConstraintBasedAssignment.java | 181 ++-
.../restlet/ZKPropertyTransferServer.java | 1 -
.../restlet/ZNRecordUpdateResource.java | 64 +-
.../stages/BestPossibleStateCalcStage.java | 147 ++-
.../stages/BestPossibleStateOutput.java | 78 +-
.../controller/stages/ClusterDataCache.java | 9 +-
.../stages/CompatibilityCheckStage.java | 21 +-
.../stages/CurrentStateComputationStage.java | 100 +-
.../controller/stages/CurrentStateOutput.java | 1 +
.../stages/ExternalViewComputeStage.java | 154 +--
.../controller/stages/HealthDataCache.java | 2 -
.../stages/MessageGenerationOutput.java | 65 -
.../stages/MessageGenerationPhase.java | 207 ---
.../stages/MessageGenerationStage.java | 213 ++++
.../helix/controller/stages/MessageOutput.java | 79 ++
.../stages/MessageSelectionStage.java | 135 +-
.../stages/MessageSelectionStageOutput.java | 59 -
.../controller/stages/MessageThrottleStage.java | 44 +-
.../stages/MessageThrottleStageOutput.java | 52 -
.../stages/PersistAssignmentStage.java | 45 +
.../controller/stages/ReadClusterDataStage.java | 49 +-
.../controller/stages/ReadHealthDataStage.java | 2 -
.../stages/RebalanceIdealStateStage.java | 80 --
.../stages/ResourceComputationStage.java | 163 +--
.../controller/stages/ResourceCurrentState.java | 255 ++++
.../stages/StatsAggregationStage.java | 4 +-
.../controller/stages/TaskAssignmentStage.java | 75 +-
.../strategy/AutoRebalanceStrategy.java | 76 +-
.../strategy/EspressoRelayStrategy.java | 3 +-
.../helix/controller/strategy/RUSHrHash.java | 6 +-
.../helix/healthcheck/DecayAggregationType.java | 2 +-
.../DefaultHealthReportProvider.java | 1 -
.../ParticipantHealthReportCollectorImpl.java | 2 +-
.../PerformanceHealthReportProvider.java | 2 +-
.../java/org/apache/helix/healthcheck/Stat.java | 4 +-
.../healthcheck/StatHealthReportProvider.java | 4 +-
.../helix/josql/ClusterJosqlQueryProcessor.java | 2 +-
.../java/org/apache/helix/lock/HelixLock.java | 43 +
.../org/apache/helix/lock/HelixLockable.java | 36 +
.../org/apache/helix/lock/zk/LockListener.java | 39 +
.../apache/helix/lock/zk/ProtocolSupport.java | 191 +++
.../org/apache/helix/lock/zk/WriteLock.java | 294 +++++
.../org/apache/helix/lock/zk/ZKHelixLock.java | 154 +++
.../org/apache/helix/lock/zk/ZNodeName.java | 113 ++
.../helix/lock/zk/ZooKeeperOperation.java | 38 +
.../helix/manager/zk/AbstractManager.java | 6 +-
.../java/org/apache/helix/manager/zk/Cache.java | 1 -
.../helix/manager/zk/CallbackHandler.java | 2 -
.../helix/manager/zk/ControllerManager.java | 5 +-
.../manager/zk/ControllerManagerHelper.java | 1 -
.../manager/zk/CurStateCarryOverUpdater.java | 11 +-
.../DefaultControllerMessageHandlerFactory.java | 8 +-
...ltParticipantErrorMessageHandlerFactory.java | 12 +-
.../DefaultSchedulerMessageHandlerFactory.java | 73 +-
.../zk/DistributedControllerManager.java | 4 +-
.../manager/zk/DistributedLeaderElection.java | 4 +-
.../manager/zk/HelixConnectionAdaptor.java | 296 +++++
.../helix/manager/zk/HelixGroupCommit.java | 1 -
.../helix/manager/zk/ParticipantManager.java | 2 -
.../manager/zk/ParticipantManagerHelper.java | 10 +-
.../helix/manager/zk/WriteThroughCache.java | 1 -
.../apache/helix/manager/zk/ZKHelixAdmin.java | 71 +-
.../helix/manager/zk/ZKHelixDataAccessor.java | 5 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 8 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 817 ++++++------
.../manager/zk/ZkCacheBaseDataAccessor.java | 55 +-
.../helix/manager/zk/ZkCallbackCache.java | 1 -
.../org/apache/helix/manager/zk/ZkClient.java | 1 -
.../helix/manager/zk/ZkHelixAutoController.java | 114 ++
.../helix/manager/zk/ZkHelixConnection.java | 605 +++++++++
.../helix/manager/zk/ZkHelixController.java | 236 ++++
.../helix/manager/zk/ZkHelixLeaderElection.java | 148 +++
.../helix/manager/zk/ZkHelixParticipant.java | 475 +++++++
.../zk/serializer/JacksonPayloadSerializer.java | 1 -
.../apache/helix/messaging/AsyncCallback.java | 2 +-
.../messaging/DefaultMessagingService.java | 26 +-
.../handling/AsyncCallbackService.java | 21 +-
.../messaging/handling/BatchMessageHandler.java | 13 +-
.../messaging/handling/GroupMessageHandler.java | 3 +-
.../handling/HelixStateTransitionHandler.java | 92 +-
.../helix/messaging/handling/HelixTask.java | 43 +-
.../messaging/handling/HelixTaskExecutor.java | 48 +-
.../messaging/handling/MessageHandler.java | 2 -
.../messaging/handling/MessageTimeoutTask.java | 2 +-
.../org/apache/helix/model/AlertStatus.java | 19 +
.../java/org/apache/helix/model/Alerts.java | 19 +
.../helix/model/ClusterConfiguration.java | 109 ++
.../apache/helix/model/ClusterConstraints.java | 77 +-
.../org/apache/helix/model/CurrentState.java | 130 +-
.../org/apache/helix/model/ExternalView.java | 87 ++
.../apache/helix/model/HelixConfigScope.java | 17 +
.../java/org/apache/helix/model/IdealState.java | 378 +++++-
.../org/apache/helix/model/InstanceConfig.java | 65 +-
.../org/apache/helix/model/LiveInstance.java | 52 +-
.../java/org/apache/helix/model/Message.java | 375 +++++-
.../helix/model/PartitionConfiguration.java | 58 +
.../apache/helix/model/ResourceAssignment.java | 152 ++-
.../helix/model/ResourceConfiguration.java | 115 ++
.../helix/model/StateModelDefinition.java | 180 ++-
.../java/org/apache/helix/model/Transition.java | 46 +-
.../helix/model/builder/AutoModeISBuilder.java | 66 +-
.../builder/AutoRebalanceModeISBuilder.java | 36 +-
.../builder/ClusterConstraintsBuilder.java | 19 +-
.../model/builder/ConstraintItemBuilder.java | 2 +-
.../model/builder/CurrentStateBuilder.java | 124 ++
.../model/builder/CustomModeISBuilder.java | 65 +-
.../helix/model/builder/IdealStateBuilder.java | 25 +-
.../builder/MessageConstraintItemBuilder.java | 107 ++
.../builder/ResourceAssignmentBuilder.java | 93 ++
.../builder/StateConstraintItemBuilder.java | 92 ++
.../builder/StateTransitionTableBuilder.java | 19 +-
.../apache/helix/monitoring/StatusDumpTask.java | 166 +++
.../helix/monitoring/ZKPathDataDumpTask.java | 2 +-
.../monitoring/mbeans/ClusterMBeanObserver.java | 7 -
.../monitoring/mbeans/ResourceMonitor.java | 20 +-
.../mbeans/StateTransitionStatMonitor.java | 1 -
.../DistClusterControllerElection.java | 2 +-
.../DistClusterControllerStateModel.java | 8 +-
.../participant/GenericLeaderStandbyModel.java | 2 +-
.../participant/HelixCustomCodeRunner.java | 14 +-
.../participant/HelixStateMachineEngine.java | 166 ++-
.../helix/participant/StateMachineEngine.java | 66 +-
.../statemachine/HelixStateModelFactory.java | 99 ++
.../HelixStateModelFactoryAdaptor.java | 17 +
.../statemachine/ScheduledTaskStateModel.java | 1 +
.../statemachine/StateModelFactory.java | 5 +-
.../apache/helix/tools/CLMLogFileAppender.java | 5 -
.../tools/ClusterExternalViewVerifier.java | 17 +-
.../org/apache/helix/tools/ClusterSetup.java | 10 +-
.../helix/tools/ClusterStateVerifier.java | 90 +-
.../org/apache/helix/tools/MessagePoster.java | 24 +-
.../org/apache/helix/tools/NewClusterSetup.java | 1181 ++++++++++++++++++
.../helix/tools/StateModelConfigGenerator.java | 13 +-
.../org/apache/helix/tools/ZkLogAnalyzer.java | 9 +-
.../org/apache/helix/util/RebalanceUtil.java | 25 +-
.../org/apache/helix/util/StatusUpdateUtil.java | 61 +-
.../org/apache/helix/util/ZKClientPool.java | 1 -
.../src/test/java/org/apache/helix/Mocks.java | 3 +-
.../org/apache/helix/TestConfigAccessor.java | 1 -
.../java/org/apache/helix/TestGroupCommit.java | 4 -
.../apache/helix/TestHelixConfigAccessor.java | 2 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 24 +-
.../org/apache/helix/TestHelixTaskHandler.java | 47 +-
.../test/java/org/apache/helix/TestHelper.java | 21 +-
.../apache/helix/TestHierarchicalDataStore.java | 7 +-
.../java/org/apache/helix/TestPerfCounters.java | 1 +
.../TestPerformanceHealthReportProvider.java | 2 -
.../java/org/apache/helix/TestZKCallback.java | 37 +-
.../apache/helix/TestZKRoutingInfoProvider.java | 3 +-
.../java/org/apache/helix/TestZNRecord.java | 1 -
.../apache/helix/TestZNRecordBucketizer.java | 1 -
.../test/java/org/apache/helix/TestZkBasis.java | 17 +-
.../org/apache/helix/TestZkClientWrapper.java | 1 -
.../java/org/apache/helix/TestZnodeModify.java | 5 +-
.../java/org/apache/helix/ZkTestHelper.java | 16 +-
.../java/org/apache/helix/ZkUnitTestBase.java | 41 +-
.../org/apache/helix/alerts/TestAddAlerts.java | 4 +-
.../helix/alerts/TestAddPersistentStats.java | 3 +-
.../helix/alerts/TestAlertValidation.java | 3 -
.../alerts/TestArrivingParticipantStats.java | 4 +-
.../helix/alerts/TestBaseStatsValidation.java | 4 -
.../apache/helix/alerts/TestEvaluateAlerts.java | 5 -
.../org/apache/helix/alerts/TestOperators.java | 4 -
.../org/apache/helix/alerts/TestStatsMatch.java | 4 -
.../test/java/org/apache/helix/api/TestId.java | 90 ++
.../apache/helix/api/TestNamespacedConfig.java | 177 +++
.../org/apache/helix/api/TestNewStages.java | 250 ++++
.../org/apache/helix/api/TestUpdateConfig.java | 158 +++
.../api/accessor/TestAccessorRecreate.java | 170 +++
.../helix/api/accessor/TestAtomicAccessors.java | 200 +++
.../context/TestSerializeRebalancerContext.java | 103 ++
.../helix/controller/stages/BaseStageTest.java | 76 +-
.../TestBestPossibleCalcStageCompatibility.java | 75 +-
.../stages/TestBestPossibleStateCalcStage.java | 28 +-
.../stages/TestCompatibilityCheckStage.java | 6 +-
.../TestCurrentStateComputationStage.java | 82 +-
.../stages/TestMessageThrottleStage.java | 80 +-
.../stages/TestMsgSelectionStage.java | 117 +-
.../stages/TestParseInfoFromAlert.java | 17 +-
.../stages/TestRebalancePipeline.java | 139 ++-
.../stages/TestResourceComputationStage.java | 91 +-
.../strategy/TestAutoRebalanceStrategy.java | 238 ++--
.../strategy/TestNewAutoRebalanceStrategy.java | 615 +++++++++
.../helix/healthcheck/TestAddDropAlert.java | 15 +-
.../healthcheck/TestAlertActionTriggering.java | 6 +-
.../helix/healthcheck/TestAlertFireHistory.java | 8 +-
.../helix/healthcheck/TestDummyAlerts.java | 2 +-
.../helix/healthcheck/TestExpandAlert.java | 15 +-
.../helix/healthcheck/TestSimpleAlert.java | 15 +-
.../healthcheck/TestSimpleWildcardAlert.java | 15 +-
.../helix/healthcheck/TestStalenessAlert.java | 15 +-
.../helix/healthcheck/TestWildcardAlert.java | 15 +-
.../helix/integration/TestAddClusterV2.java | 2 -
.../TestAddNodeAfterControllerStart.java | 2 -
.../TestAddStateModelFactoryAfterConnect.java | 10 +-
.../helix/integration/TestAutoRebalance.java | 66 +-
.../TestAutoRebalancePartitionLimit.java | 38 +-
.../helix/integration/TestBatchMessage.java | 19 +-
.../integration/TestBucketizedResource.java | 2 +-
.../integration/TestCleanupExternalView.java | 18 +-
.../TestCustomizedIdealStateRebalancer.java | 99 +-
.../apache/helix/integration/TestDisable.java | 2 +-
.../integration/TestDistributedCMMain.java | 2 +-
.../TestDistributedClusterController.java | 2 +-
.../org/apache/helix/integration/TestDrop.java | 10 +-
.../TestEnablePartitionDuringDisable.java | 13 +-
.../helix/integration/TestErrorPartition.java | 2 +-
.../integration/TestExternalViewUpdates.java | 2 +-
.../helix/integration/TestFailOverPerf1kp.java | 1 -
.../helix/integration/TestHelixConnection.java | 153 +++
.../integration/TestHelixCustomCodeRunner.java | 8 +-
.../integration/TestInvalidAutoIdealState.java | 7 +-
.../TestMessagePartitionStateMismatch.java | 42 +-
.../helix/integration/TestMessageThrottle.java | 8 +-
.../helix/integration/TestMessageThrottle2.java | 25 +-
.../helix/integration/TestMessagingService.java | 50 +-
.../integration/TestNonOfflineInitState.java | 2 +-
.../TestParticipantErrorMessage.java | 18 +-
.../helix/integration/TestRenamePartition.java | 5 +-
.../helix/integration/TestResetInstance.java | 2 +-
.../integration/TestResetPartitionState.java | 15 +-
.../helix/integration/TestResetResource.java | 2 +-
.../helix/integration/TestSchedulerMessage.java | 488 ++------
.../integration/TestSchedulerMsgContraints.java | 254 ++++
.../integration/TestSchedulerMsgUsingQueue.java | 181 +++
.../TestSessionExpiryInTransition.java | 8 +-
.../TestStandAloneCMSessionExpiry.java | 2 +-
.../integration/TestStateTransitionTimeout.java | 21 +-
.../helix/integration/TestSwapInstance.java | 2 +-
.../integration/TestZkCallbackHandlerLeak.java | 10 +-
.../helix/integration/TestZkReconnect.java | 10 +-
.../integration/ZkIntegrationTestBase.java | 6 +-
.../manager/TestConsecutiveZkSessionExpiry.java | 1 -
.../manager/TestParticipantManager.java | 4 +-
.../apache/helix/josql/TestJosqlProcessor.java | 1 -
.../apache/helix/lock/zk/TestZKHelixLock.java | 127 ++
.../TestDefaultControllerMsgHandlerFactory.java | 10 +-
.../helix/manager/zk/TestHandleNewSession.java | 1 -
.../manager/zk/TestLiveInstanceBounce.java | 4 +-
.../zk/TestWtCacheAsyncOpMultiThread.java | 2 -
.../zk/TestWtCacheAsyncOpSingleThread.java | 2 -
.../zk/TestWtCacheSyncOpSingleThread.java | 2 -
.../manager/zk/TestZKLiveInstanceData.java | 2 -
.../zk/TestZKPropertyTransferServer.java | 1 -
.../helix/manager/zk/TestZNRecordSizeLimit.java | 25 +-
.../manager/zk/TestZkBaseDataAccessor.java | 3 -
.../zk/TestZkCacheAsyncOpSingleThread.java | 4 -
.../zk/TestZkCacheSyncOpSingleThread.java | 5 -
.../helix/manager/zk/TestZkClusterManager.java | 19 +-
.../helix/manager/zk/TestZkHelixAdmin.java | 20 +-
.../manager/zk/TestZkStateChangeListener.java | 8 +-
.../helix/messaging/TestAsyncCallback.java | 10 +-
.../helix/messaging/TestAsyncCallbackSvc.java | 50 +-
.../messaging/TestDefaultMessagingService.java | 4 +-
.../handling/TestConfigThreadpoolSize.java | 4 +-
.../handling/TestHelixTaskExecutor.java | 89 +-
.../handling/TestResourceThreadpoolSize.java | 4 +-
.../mock/controller/ClusterController.java | 2 -
.../helix/mock/controller/MockController.java | 22 +-
.../mock/controller/MockControllerProcess.java | 17 +-
.../helix/mock/participant/DummyProcess.java | 7 +-
.../helix/mock/participant/ErrTransition.java | 10 +-
.../mock/participant/MockMSModelFactory.java | 2 -
.../helix/mock/participant/MockParticipant.java | 16 -
.../StoreAccessDiffNodeTransition.java | 4 +-
.../StoreAccessOneNodeTransition.java | 5 +-
.../org/apache/helix/model/TestConstraint.java | 17 +-
.../org/apache/helix/model/TestIdealState.java | 34 +-
.../monitoring/TestParticipantMonitor.java | 3 -
.../helix/monitoring/TestStatCollector.java | 4 +-
.../TestClusterAlertItemMBeanCollection.java | 4 -
.../monitoring/mbeans/TestResourceMonitor.java | 3 +-
.../participant/TestDistControllerElection.java | 3 +-
.../TestDistControllerStateModel.java | 23 +-
.../TestDistControllerStateModelFactory.java | 3 -
.../apache/helix/store/TestJsonComparator.java | 1 -
.../store/zk/TestZkHelixPropertyStore.java | 1 -
.../apache/helix/tools/TestClusterSetup.java | 7 +-
.../apache/helix/tools/TestHelixAdminCli.java | 32 +-
.../org/apache/helix/util/TestZKClientPool.java | 1 -
helix-examples/pom.xml | 4 +
.../apache/helix/examples/BootstrapHandler.java | 7 +-
.../apache/helix/examples/BootstrapProcess.java | 24 +-
.../apache/helix/examples/DummyParticipant.java | 25 +-
.../apache/helix/examples/ExampleProcess.java | 1 -
.../helix/examples/IdealStateExample.java | 5 +-
.../helix/examples/LogicalModelExample.java | 298 +++++
.../examples/MasterSlaveStateModelFactory.java | 17 +-
.../org/apache/helix/examples/Quickstart.java | 20 +-
.../apache/helix/lockmanager/LockFactory.java | 9 +-
.../helix/lockmanager/LockManagerDemo.java | 7 +-
.../apache/helix/lockmanager/LockProcess.java | 5 +-
.../apache/helix/recipes/rabbitmq/Consumer.java | 5 +-
.../rabbitmq/ConsumerStateModelFactory.java | 10 +-
.../recipes/rabbitmq/SetupConsumerCluster.java | 5 +-
.../helix/filestore/ChangeLogGenerator.java | 2 -
.../helix/filestore/ChangeLogProcessor.java | 6 -
.../apache/helix/filestore/ChangeLogReader.java | 2 -
.../org/apache/helix/filestore/FileStore.java | 9 +-
.../helix/filestore/FileStoreStateModel.java | 38 +-
.../filestore/FileStoreStateModelFactory.java | 10 +-
.../apache/helix/filestore/SetupCluster.java | 5 +-
.../java/org/apache/helix/filestore/Test.java | 3 -
.../servicediscovery/ServiceDiscovery.java | 1 -
.../helix/taskexecution/TaskStateModel.java | 17 +-
.../taskexecution/TaskStateModelFactory.java | 10 +-
.../org/apache/helix/taskexecution/Worker.java | 4 +-
.../userdefinedrebalancer/LockFactory.java | 10 +-
.../LockManagerRebalancer.java | 44 +-
.../userdefinedrebalancer/LockProcess.java | 5 +-
395 files changed, 22658 insertions(+), 4067 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --cc helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index a0e00a3,a0e00a3..69d45ae
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@@ -19,12 -19,12 +19,13 @@@ package org.apache.helix.agent
* under the License.
*/
--import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
--public class AgentStateModelFactory extends StateModelFactory<AgentStateModel> {
++public class AgentStateModelFactory extends HelixStateModelFactory<AgentStateModel> {
@Override
-- public AgentStateModel createNewStateModel(String partitionKey) {
++ public AgentStateModel createNewStateModel(PartitionId partitionKey) {
AgentStateModel model = new AgentStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-agent/src/main/java/org/apache/helix/agent/HelixAgentMain.java
----------------------------------------------------------------------
diff --cc helix-agent/src/main/java/org/apache/helix/agent/HelixAgentMain.java
index c900546,c900546..ded2d9b
--- a/helix-agent/src/main/java/org/apache/helix/agent/HelixAgentMain.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/HelixAgentMain.java
@@@ -31,6 -31,6 +31,7 @@@ import org.apache.commons.cli.Options
import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
++import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
@@@ -128,7 -128,7 +129,8 @@@ public class HelixAgentMain
new ZKHelixManager(clusterName, instance, InstanceType.PARTICIPANT, zkAddress);
StateMachineEngine stateMach = manager.getStateMachineEngine();
-- stateMach.registerStateModelFactory(stateModelName, new AgentStateModelFactory());
++ stateMach.registerStateModelFactory(StateModelDefId.from(stateModelName),
++ new AgentStateModelFactory());
Runtime.getRuntime().addShutdownHook(new HelixAgentShutdownHook(manager));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 0000000,c48f4f2..b01a3ec
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@@ -1,0 -1,836 +1,836 @@@
+ package org.apache.helix.api.accessor;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.I0Itec.zkclient.DataUpdater;
+ import org.apache.helix.AccessOption;
+ import org.apache.helix.BaseDataAccessor;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixException;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.alerts.AlertsHolder;
+ import org.apache.helix.alerts.StatsHolder;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Controller;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.config.ParticipantConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ConstraintId;
+ import org.apache.helix.api.id.ControllerId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.SessionId;
+ import org.apache.helix.api.id.StateModelDefId;
+ import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.model.Alerts;
+ import org.apache.helix.model.ClusterConfiguration;
+ import org.apache.helix.model.ClusterConstraints;
+ import org.apache.helix.model.ClusterConstraints.ConstraintType;
+ import org.apache.helix.model.CurrentState;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.LiveInstance;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.PauseSignal;
+ import org.apache.helix.model.PersistentStats;
+ import org.apache.helix.model.ResourceAssignment;
+ import org.apache.helix.model.ResourceConfiguration;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+
+ public class ClusterAccessor {
+ private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
+
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+ private final ClusterId _clusterId;
+
+ /**
+ * Instantiate a cluster accessor
+ * @param clusterId the cluster to access
+ * @param accessor HelixDataAccessor for the physical store
+ */
+ public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ _clusterId = clusterId;
+ }
+
+ /**
+ * create a new cluster, fail if it already exists
+ * @return true if created, false if creation failed
+ */
+ public boolean createCluster(ClusterConfig cluster) {
+ ClusterConfiguration configuration = _accessor.getProperty(_keyBuilder.clusterConfig());
+ if (configuration != null && isClusterStructureValid()) {
+ LOG.error("Cluster already created. Aborting.");
+ return false;
+ }
+ clearClusterStructure();
+ initClusterStructure();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
+ for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
+ addStateModelDefinitionToCluster(stateModelDef);
+ }
+ Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
+ for (ResourceConfig resource : resources.values()) {
+ addResourceToCluster(resource);
+ }
+ Map<ParticipantId, ParticipantConfig> participants = cluster.getParticipantMap();
+ for (ParticipantConfig participant : participants.values()) {
+ addParticipantToCluster(participant);
+ }
+ _accessor.createProperty(_keyBuilder.constraints(), null);
+ for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
+ _accessor.setProperty(_keyBuilder.constraint(constraints.getType().toString()), constraints);
+ }
+ ClusterConfiguration clusterConfig = ClusterConfiguration.from(cluster.getUserConfig());
+ if (cluster.autoJoinAllowed()) {
+ clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
+ }
+ if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
+ _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
+ }
+ if (cluster.isPaused()) {
+ pauseCluster();
+ }
+ _accessor.setProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+ return true;
+ }
+
+ /**
+ * Update the cluster configuration
+ * @param clusterDelta change to the cluster configuration
+ * @return updated ClusterConfig, or null if there was an error
+ */
+ public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+ Cluster cluster = readCluster();
+ if (cluster == null) {
+ LOG.error("Cluster does not exist, cannot be updated");
+ return null;
+ }
+ ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+ boolean status = setBasicClusterConfig(config);
+ return status ? config : null;
+ }
+
+ /**
+ * Set a cluster config minus state model, participants, and resources
+ * @param config ClusterConfig
+ * @return true if correctly set, false otherwise
+ */
+ private boolean setBasicClusterConfig(ClusterConfig config) {
+ if (config == null) {
+ return false;
+ }
+ ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig());
+ configuration.setAutoJoinAllowed(config.autoJoinAllowed());
+ _accessor.setProperty(_keyBuilder.clusterConfig(), configuration);
+ Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap();
+ for (ConstraintType type : constraints.keySet()) {
+ ClusterConstraints constraint = constraints.get(type);
+ _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
+ }
+ if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
+ _accessor.removeProperty(_keyBuilder.persistantStat());
+ } else {
+ _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
+ }
+ if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
+ _accessor.removeProperty(_keyBuilder.alerts());
+ } else {
+ _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
+ }
+ return true;
+ }
+
+ /**
+ * drop a cluster
+ * @return true if the cluster was dropped, false if there was an error
+ */
+ public boolean dropCluster() {
+ LOG.info("Dropping cluster: " + _clusterId);
+ List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
+ if (liveInstanceNames.size() > 0) {
+ LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
+ + liveInstanceNames + ", shutdown participants first.");
+ return false;
+ }
+
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ if (leader != null) {
+ LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
+ + " are running, shutdown leader first.");
+ return false;
+ }
+
+ return _accessor.removeProperty(_keyBuilder.cluster());
+ }
+
+ /**
+ * read entire cluster data
+ * @return cluster snapshot or null
+ */
+ public Cluster readCluster() {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster is not fully set up");
+ return null;
+ }
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+ /**
+ * map of constraint-type to constraints
+ */
+ Map<String, ClusterConstraints> constraintMap =
+ _accessor.getChildValuesMap(_keyBuilder.constraints());
+
+ // read all the resources
+ Map<ResourceId, Resource> resourceMap = readResources();
+
+ // read all the participants
+ Map<ParticipantId, Participant> participantMap = readParticipants();
+
+ // read the controllers
+ Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+ ControllerId leaderId = null;
+ if (leader != null) {
+ leaderId = ControllerId.from(leader.getId());
+ controllerMap.put(leaderId, new Controller(leaderId, leader, true));
+ }
+
+ // read the constraints
+ Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+ new HashMap<ConstraintType, ClusterConstraints>();
+ for (String constraintType : constraintMap.keySet()) {
+ clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+ constraintMap.get(constraintType));
+ }
+
+ // read the pause status
+ PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ boolean isPaused = pauseSignal != null;
+
+ ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ boolean autoJoinAllowed = false;
+ UserConfig userConfig;
+ if (clusterConfig != null) {
+ userConfig = clusterConfig.getUserConfig();
+ autoJoinAllowed = clusterConfig.autoJoinAllowed();
+ } else {
+ userConfig = new UserConfig(Scope.cluster(_clusterId));
+ }
+
+ // read the state model definitions
+ Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
+
+ // read the stats
+ PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
+
+ // read the alerts
+ Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+
+ // create the cluster snapshot object
+ return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+ clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+ }
+
+ /**
+ * Get all the state model definitions for this cluster
+ * @return map of state model def id to state model definition
+ */
+ public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
+ List<StateModelDefinition> stateModelList =
+ _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ for (StateModelDefinition stateModelDef : stateModelList) {
+ stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
+ }
+ return stateModelDefs;
+ }
+
+ /**
+ * Read all resources in the cluster
+ * @return map of resource id to resource
+ */
+ public Map<ResourceId, Resource> readResources() {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster is not fully set up yet!");
+ return Collections.emptyMap();
+ }
+
+ /**
+ * map of resource-id to ideal-state
+ */
+ Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+
+ /**
+ * Map of resource id to external view
+ */
+ Map<String, ExternalView> externalViewMap =
+ _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
+ /**
+ * Map of resource id to user configuration
+ */
+ Map<String, ResourceConfiguration> resourceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+
+ /**
+ * Map of resource id to resource assignment
+ */
+ Map<String, ResourceAssignment> resourceAssignmentMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+
+ // read all the resources
+ Set<String> allResources = Sets.newHashSet();
+ allResources.addAll(idealStateMap.keySet());
+ allResources.addAll(resourceConfigMap.keySet());
+ Map<ResourceId, Resource> resourceMap = Maps.newHashMap();
+ for (String resourceName : allResources) {
+ ResourceId resourceId = ResourceId.from(resourceName);
+ resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
+ resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
+ externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
+ }
+
+ return resourceMap;
+ }
+
+ /**
+ * Read all participants in the cluster
+ * @return map of participant id to participant, or empty map
+ */
+ public Map<ParticipantId, Participant> readParticipants() {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster is not fully set up yet!");
+ return Collections.emptyMap();
+ }
+
+ /**
+ * map of instance-id to instance-config
+ */
+ Map<String, InstanceConfig> instanceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+ /**
+ * map of instance-id to live-instance
+ */
+ Map<String, LiveInstance> liveInstanceMap =
+ _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+ /**
+ * map of participant-id to map of message-id to message
+ */
+ Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+ for (String instanceName : liveInstanceMap.keySet()) {
+ Map<String, Message> instanceMsgMap =
+ _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+ messageMap.put(instanceName, instanceMsgMap);
+ }
+
+ /**
+ * map of participant-id to map of resource-id to current-state
+ */
+ Map<String, Map<String, CurrentState>> currentStateMap =
+ new HashMap<String, Map<String, CurrentState>>();
+ for (String participantName : liveInstanceMap.keySet()) {
+ LiveInstance liveInstance = liveInstanceMap.get(participantName);
+ SessionId sessionId = liveInstance.getTypedSessionId();
+ Map<String, CurrentState> instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+
+ currentStateMap.put(participantName, instanceCurStateMap);
+ }
+
+ // read all the participants
+ Map<ParticipantId, Participant> participantMap = Maps.newHashMap();
+ for (String participantName : instanceConfigMap.keySet()) {
+ InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
+ UserConfig userConfig = instanceConfig.getUserConfig();
+ LiveInstance liveInstance = liveInstanceMap.get(participantName);
+ Map<String, Message> instanceMsgMap = messageMap.get(participantName);
+
+ ParticipantId participantId = ParticipantId.from(participantName);
+
+ participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
+ instanceConfig, userConfig, liveInstance, instanceMsgMap,
+ currentStateMap.get(participantName)));
+ }
+
+ return participantMap;
+ }
+
+ /**
+ * Get cluster constraints of a given type
+ * @param type ConstraintType value
+ * @return ClusterConstraints, or null if none present
+ */
+ public ClusterConstraints readConstraints(ConstraintType type) {
+ return _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+ }
+
+ /**
+ * Remove a constraint from the cluster
+ * @param type the constraint type
+ * @param constraintId the constraint id
+ * @return true if removed, false otherwise
+ */
+ public boolean removeConstraint(ConstraintType type, ConstraintId constraintId) {
+ ClusterConstraints constraints = _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+ if (constraints == null || constraints.getConstraintItem(constraintId) == null) {
+ LOG.error("Constraint with id " + constraintId + " not present");
+ return false;
+ }
+ constraints.removeConstraintItem(constraintId);
+ return _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraints);
+ }
+
+ /**
+ * Read the user config of the cluster
+ * @return UserConfig, or null
+ */
+ public UserConfig readUserConfig() {
+ ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ return clusterConfig != null ? clusterConfig.getUserConfig() : null;
+ }
+
+ /**
+ * Set the user config of the cluster, overwriting existing user configs
+ * @param userConfig the new user config
+ * @return true if the user config was set, false otherwise
+ */
+ public boolean setUserConfig(UserConfig userConfig) {
+ ClusterConfig.Delta delta = new ClusterConfig.Delta(_clusterId).setUserConfig(userConfig);
+ return updateCluster(delta) != null;
+ }
+
+ /**
+ * Clear any user-specified configuration from the cluster
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig() {
+ return setUserConfig(new UserConfig(Scope.cluster(_clusterId)));
+ }
+
+ /**
+ * Get the stats persisted on this cluster
+ * @return PersistentStats, or null if none persisted
+ */
+ public PersistentStats readStats() {
+ return _accessor.getProperty(_keyBuilder.persistantStat());
+ }
+
+ /**
+ * Add a statistic specification to the cluster. Existing stat specifications will not be
+ * overwritten
+ * @param statName string representing a stat specification
+ * @return true if the stat spec was added, false otherwise
+ */
+ public boolean addStat(final String statName) {
+ if (!isClusterStructureValid()) {
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
+ }
+
+ String persistentStatsPath = _keyBuilder.persistantStat().getPath();
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord statsRec) {
+ if (statsRec == null) {
+ statsRec = new ZNRecord(PersistentStats.nodeName);
+ }
+ Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+ Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+ for (String newStat : newStatMap.keySet()) {
+ if (!currStatMap.containsKey(newStat)) {
+ currStatMap.put(newStat, newStatMap.get(newStat));
+ }
+ }
+ statsRec.setMapFields(currStatMap);
+ return statsRec;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Remove a statistic specification from the cluster
+ * @param statName string representing a statistic specification
+ * @return true if stats removed, false otherwise
+ */
+ public boolean dropStat(final String statName) {
+ if (!isClusterStructureValid()) {
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
+ }
+
+ String persistentStatsPath = _keyBuilder.persistantStat().getPath();
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord statsRec) {
+ if (statsRec == null) {
+ throw new HelixException("No stats record in ZK, nothing to drop");
+ }
+ Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+ Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+ // delete each stat from stat map
+ for (String newStat : newStatMap.keySet()) {
+ if (currStatMap.containsKey(newStat)) {
+ currStatMap.remove(newStat);
+ }
+ }
+ statsRec.setMapFields(currStatMap);
+ return statsRec;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Add an alert specification to the cluster
+ * @param alertName string representing the alert spec
+ * @return true if added, false otherwise
+ */
+ public boolean addAlert(final String alertName) {
+ if (!isClusterStructureValid()) {
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
+ }
+
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ String alertsPath = _keyBuilder.alerts().getPath();
+ return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord alertsRec) {
+ if (alertsRec == null) {
+ alertsRec = new ZNRecord(Alerts.nodeName);
+ }
+ Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+ StringBuilder newStatName = new StringBuilder();
+ Map<String, String> newAlertMap = new HashMap<String, String>();
+
+ // use AlertsHolder to get map of new stats and map for this alert
+ AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
+
+ // add stat
+ addStat(newStatName.toString());
+
+ // add alert
+ currAlertMap.put(alertName, newAlertMap);
+ alertsRec.setMapFields(currAlertMap);
+ return alertsRec;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Remove an alert specification from the cluster
+ * @param alertName string representing an alert specification
+ * @return true if removed, false otherwise
+ */
+ public boolean dropAlert(final String alertName) {
+ if (!isClusterStructureValid()) {
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
+ }
+
+ String alertsPath = _keyBuilder.alerts().getPath();
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord alertsRec) {
+ if (alertsRec == null) {
+ throw new HelixException("No alerts record persisted, nothing to drop");
+ }
+ Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+ currAlertMap.remove(alertName);
+ alertsRec.setMapFields(currAlertMap);
+ return alertsRec;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Add user configuration to the existing cluster user configuration. Overwrites properties with
+ * the same key
+ * @param userConfig the user config key-value pairs to add
+ * @return true if the user config was updated, false otherwise
+ */
+ public boolean updateUserConfig(UserConfig userConfig) {
+ ClusterConfiguration clusterConfig = new ClusterConfiguration(_clusterId);
+ clusterConfig.addNamespacedConfig(userConfig);
+ return _accessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
+ }
+
+ /**
+ * pause controller of cluster
+ * @return true if cluster was paused, false if pause failed or already paused
+ */
+ public boolean pauseCluster() {
+ return _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
+ }
+
+ /**
+ * resume controller of cluster
+ * @return true if resume succeeded, false otherwise
+ */
+ public boolean resumeCluster() {
+ return _accessor.removeProperty(_keyBuilder.pause());
+ }
+
+ /**
+ * add a resource to cluster
+ * @param resource
+ * @return true if resource added, false if there was an error
+ */
+ public boolean addResourceToCluster(ResourceConfig resource) {
+ if (resource == null || resource.getRebalancerConfig() == null) {
+ LOG.error("Resource not fully defined with a rebalancer context");
+ return false;
+ }
+
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+ RebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ StateModelDefId stateModelDefId = context.getStateModelDefId();
+ if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
+ LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
+ return false;
+ }
+
+ ResourceId resourceId = resource.getId();
- if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
++ if (_accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify())) != null) {
+ LOG.error("Skip adding resource: " + resourceId
+ + ", because resource ideal state already exists in cluster: " + _clusterId);
+ return false;
+ }
+ if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
+ LOG.error("Skip adding resource: " + resourceId
+ + ", because resource config already exists in cluster: " + _clusterId);
+ return false;
+ }
+
+ // Add resource user config
+ if (resource.getUserConfig() != null) {
+ ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+ configuration.setType(resource.getType());
+ configuration.addNamespacedConfig(resource.getUserConfig());
+ configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
+ configuration.setBucketSize(resource.getBucketSize());
+ configuration.setBatchMessageMode(resource.getBatchMessageMode());
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
+
+ // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ IdealState idealState =
+ ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
+ resource.getBatchMessageMode());
+ if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
++ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ }
+ return true;
+ }
+
+ /**
+ * drop a resource from cluster
+ * @param resourceId
+ * @return true if removal succeeded, false otherwise
+ */
+ public boolean dropResourceFromCluster(ResourceId resourceId) {
- if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
++ if (_accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify())) == null) {
+ LOG.error("Skip removing resource: " + resourceId
+ + ", because resource ideal state already removed from cluster: " + _clusterId);
+ return false;
+ }
- _accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
++ _accessor.removeProperty(_keyBuilder.idealStates(resourceId.stringify()));
+ _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ return true;
+ }
+
+ /**
+ * check if cluster structure is valid
+ * @return true if valid or false otherwise
+ */
+ public boolean isClusterStructureValid() {
+ List<String> paths = getRequiredPaths(_keyBuilder);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ if (baseAccessor != null) {
+ boolean[] existsResults = baseAccessor.exists(paths, 0);
+ for (boolean exists : existsResults) {
+ if (!exists) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create empty persistent properties to ensure that there is a valid cluster structure
+ */
+ public void initClusterStructure() {
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ List<String> paths = getRequiredPaths(_keyBuilder);
+ for (String path : paths) {
+ boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+ if (!status && LOG.isDebugEnabled()) {
+ LOG.debug(path + " already exists");
+ }
+ }
+ }
+
+ /**
+ * Remove all but the top level cluster node; intended for reconstructing the cluster
+ */
+ private void clearClusterStructure() {
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ List<String> paths = getRequiredPaths(_keyBuilder);
+ baseAccessor.remove(paths, 0);
+ }
+
+ /**
+ * Get all property paths that must be set for a cluster structure to be valid
+ * @param keyBuilder a PropertyKey.Builder for the cluster
+ * @return list of paths as strings
+ */
+ private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+ List<String> paths = Lists.newArrayList();
+ paths.add(keyBuilder.clusterConfigs().getPath());
+ paths.add(keyBuilder.instanceConfigs().getPath());
+ paths.add(keyBuilder.propertyStore().getPath());
+ paths.add(keyBuilder.liveInstances().getPath());
+ paths.add(keyBuilder.instances().getPath());
+ paths.add(keyBuilder.externalViews().getPath());
+ paths.add(keyBuilder.controller().getPath());
+ paths.add(keyBuilder.stateModelDefs().getPath());
+ paths.add(keyBuilder.controllerMessages().getPath());
+ paths.add(keyBuilder.controllerTaskErrors().getPath());
+ paths.add(keyBuilder.controllerTaskStatuses().getPath());
+ paths.add(keyBuilder.controllerLeaderHistory().getPath());
+ return paths;
+ }
+
+ /**
+ * add a participant to cluster
+ * @param participant
+ * @return true if participant added, false otherwise
+ */
+ public boolean addParticipantToCluster(ParticipantConfig participant) {
+ if (participant == null) {
+ LOG.error("Participant not initialized");
+ return false;
+ }
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+
+ ParticipantAccessor participantAccessor = new ParticipantAccessor(_accessor);
+ ParticipantId participantId = participant.getId();
+ InstanceConfig existConfig =
+ _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+ if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) {
+ LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+ + _clusterId);
+ return false;
+ }
+
+ // clear and rebuild the participant structure
+ participantAccessor.clearParticipantStructure(participantId);
+ participantAccessor.initParticipantStructure(participantId);
+
+ // add the config
+ InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
+ instanceConfig.setHostName(participant.getHostName());
+ instanceConfig.setPort(Integer.toString(participant.getPort()));
+ instanceConfig.setInstanceEnabled(participant.isEnabled());
+ UserConfig userConfig = participant.getUserConfig();
+ instanceConfig.addNamespacedConfig(userConfig);
+ Set<String> tags = participant.getTags();
+ for (String tag : tags) {
+ instanceConfig.addTag(tag);
+ }
+ Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
+ for (PartitionId partitionId : disabledPartitions) {
+ instanceConfig.setParticipantEnabledForPartition(partitionId, false);
+ }
+ _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
+ return true;
+ }
+
+ /**
+ * drop a participant from cluster
+ * @param participantId
+ * @return true if participant dropped, false if there was an error
+ */
+ public boolean dropParticipantFromCluster(ParticipantId participantId) {
+ ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+ return accessor.dropParticipant(participantId);
+ }
+
+ /**
+ * Add a state model definition. Updates the existing state model definition if it already exists.
+ * @param stateModelDef fully initialized state model definition
+ * @return true if the model is persisted, false otherwise
+ */
+ public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+
+ return _accessor
+ .createProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
+ }
+
+ /**
+ * Remove a state model definition if it exists
+ * @param stateModelDefId state model definition id
+ * @return true if removed, false if it did not exist
+ */
+ public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
+ return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+ }
+ }