You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/09/27 01:05:15 UTC
[6/6] git commit: [HELIX-238] Refactor, add update to accessors,
test update logic
[HELIX-238] Refactor, add update to accessors, test update logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/c070a765
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/c070a765
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/c070a765
Branch: refs/heads/helix-logical-model
Commit: c070a76514bda1f6269f1db1197289f7b588ced7
Parents: 41b6e77
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Sep 26 15:02:19 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Sep 26 16:03:49 2013 -0700
----------------------------------------------------------------------
.../resources/SchedulerTasksResource.java | 4 +-
.../java/org/apache/helix/HelixProperty.java | 2 +-
.../main/java/org/apache/helix/api/Cluster.java | 10 +
.../org/apache/helix/api/ClusterAccessor.java | 502 -------------
.../org/apache/helix/api/ClusterConfig.java | 704 -------------------
.../java/org/apache/helix/api/ClusterId.java | 57 --
.../java/org/apache/helix/api/ConstraintId.java | 78 --
.../java/org/apache/helix/api/Controller.java | 1 +
.../apache/helix/api/ControllerAccessor.java | 47 --
.../java/org/apache/helix/api/ControllerId.java | 54 --
.../src/main/java/org/apache/helix/api/Id.java | 55 --
.../java/org/apache/helix/api/MessageId.java | 54 --
.../org/apache/helix/api/NamespacedConfig.java | 227 ------
.../java/org/apache/helix/api/Participant.java | 6 +
.../apache/helix/api/ParticipantAccessor.java | 393 -----------
.../org/apache/helix/api/ParticipantConfig.java | 371 ----------
.../org/apache/helix/api/ParticipantId.java | 54 --
.../java/org/apache/helix/api/Partition.java | 1 +
.../java/org/apache/helix/api/PartitionId.java | 112 ---
.../main/java/org/apache/helix/api/ProcId.java | 54 --
.../java/org/apache/helix/api/Resource.java | 8 +-
.../org/apache/helix/api/ResourceAccessor.java | 233 ------
.../org/apache/helix/api/ResourceConfig.java | 369 ----------
.../java/org/apache/helix/api/ResourceId.java | 57 --
.../org/apache/helix/api/RunningInstance.java | 3 +
.../apache/helix/api/SchedulerTaskConfig.java | 68 --
.../main/java/org/apache/helix/api/Scope.java | 6 +
.../java/org/apache/helix/api/SessionId.java | 54 --
.../java/org/apache/helix/api/Spectator.java | 2 +
.../java/org/apache/helix/api/SpectatorId.java | 51 --
.../org/apache/helix/api/StateModelDefId.java | 66 --
.../helix/api/StateModelDefinitionAccessor.java | 69 --
.../apache/helix/api/StateModelFactoryId.java | 57 --
.../java/org/apache/helix/api/UserConfig.java | 52 --
.../helix/api/accessor/ClusterAccessor.java | 553 +++++++++++++++
.../helix/api/accessor/ControllerAccessor.java | 49 ++
.../helix/api/accessor/ParticipantAccessor.java | 435 ++++++++++++
.../helix/api/accessor/ResourceAccessor.java | 265 +++++++
.../accessor/StateModelDefinitionAccessor.java | 70 ++
.../apache/helix/api/config/ClusterConfig.java | 696 ++++++++++++++++++
.../helix/api/config/NamespacedConfig.java | 228 ++++++
.../helix/api/config/ParticipantConfig.java | 382 ++++++++++
.../apache/helix/api/config/ResourceConfig.java | 373 ++++++++++
.../helix/api/config/SchedulerTaskConfig.java | 69 ++
.../org/apache/helix/api/config/UserConfig.java | 53 ++
.../java/org/apache/helix/api/id/ClusterId.java | 57 ++
.../org/apache/helix/api/id/ConstraintId.java | 80 +++
.../org/apache/helix/api/id/ControllerId.java | 54 ++
.../main/java/org/apache/helix/api/id/Id.java | 55 ++
.../java/org/apache/helix/api/id/MessageId.java | 54 ++
.../org/apache/helix/api/id/ParticipantId.java | 54 ++
.../org/apache/helix/api/id/PartitionId.java | 112 +++
.../java/org/apache/helix/api/id/ProcId.java | 54 ++
.../org/apache/helix/api/id/ResourceId.java | 57 ++
.../java/org/apache/helix/api/id/SessionId.java | 54 ++
.../org/apache/helix/api/id/SpectatorId.java | 51 ++
.../apache/helix/api/id/StateModelDefId.java | 66 ++
.../helix/api/id/StateModelFactoryId.java | 57 ++
.../controller/rebalancer/AutoRebalancer.java | 4 +-
.../controller/rebalancer/CustomRebalancer.java | 4 +-
.../rebalancer/SemiAutoRebalancer.java | 4 +-
.../context/BasicRebalancerContext.java | 8 +-
.../rebalancer/context/CustomRebalancer.java | 4 +-
.../context/CustomRebalancerContext.java | 6 +-
.../rebalancer/context/FullAutoRebalancer.java | 4 +-
.../context/FullAutoRebalancerContext.java | 2 +-
.../context/PartitionedRebalancerContext.java | 4 +-
.../rebalancer/context/RebalancerConfig.java | 2 +-
.../rebalancer/context/RebalancerContext.java | 8 +-
.../rebalancer/context/SemiAutoRebalancer.java | 4 +-
.../context/SemiAutoRebalancerContext.java | 6 +-
.../util/NewConstraintBasedAssignment.java | 8 +-
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../stages/BestPossibleStateOutput.java | 2 +-
.../stages/CurrentStateComputationStage.java | 4 +-
.../stages/MessageGenerationPhase.java | 10 +-
.../stages/NewBestPossibleStateCalcStage.java | 10 +-
.../stages/NewBestPossibleStateOutput.java | 2 +-
.../stages/NewCompatibilityCheckStage.java | 2 +-
.../stages/NewCurrentStateComputationStage.java | 14 +-
.../stages/NewExternalViewComputeStage.java | 12 +-
.../stages/NewMessageGenerationStage.java | 18 +-
.../controller/stages/NewMessageOutput.java | 4 +-
.../stages/NewMessageSelectionStage.java | 10 +-
.../stages/NewMessageThrottleStage.java | 8 +-
.../stages/NewReadClusterDataStage.java | 4 +-
.../stages/NewResourceComputationStage.java | 8 +-
.../stages/NewTaskAssignmentStage.java | 8 +-
.../stages/PersistAssignmentStage.java | 4 +-
.../controller/stages/ResourceCurrentState.java | 8 +-
.../controller/stages/TaskAssignmentStage.java | 2 +-
.../strategy/EspressoRelayStrategy.java | 2 +-
.../manager/zk/CurStateCarryOverUpdater.java | 4 +-
.../DefaultSchedulerMessageHandlerFactory.java | 8 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 14 +-
.../messaging/DefaultMessagingService.java | 8 +-
.../messaging/handling/BatchMessageHandler.java | 4 +-
.../messaging/handling/GroupMessageHandler.java | 2 +-
.../handling/HelixStateTransitionHandler.java | 6 +-
.../messaging/handling/HelixTaskExecutor.java | 4 +-
.../helix/model/ClusterConfiguration.java | 4 +-
.../apache/helix/model/ClusterConstraints.java | 2 +-
.../org/apache/helix/model/CurrentState.java | 8 +-
.../org/apache/helix/model/ExternalView.java | 6 +-
.../java/org/apache/helix/model/IdealState.java | 10 +-
.../org/apache/helix/model/InstanceConfig.java | 4 +-
.../org/apache/helix/model/LiveInstance.java | 6 +-
.../java/org/apache/helix/model/Message.java | 12 +-
.../helix/model/PartitionConfiguration.java | 4 +-
.../apache/helix/model/ResourceAssignment.java | 6 +-
.../helix/model/ResourceConfiguration.java | 4 +-
.../helix/model/StateModelDefinition.java | 2 +-
.../builder/ClusterConstraintsBuilder.java | 2 +-
.../model/builder/CurrentStateBuilder.java | 8 +-
.../helix/model/builder/IdealStateBuilder.java | 6 +-
.../builder/MessageConstraintItemBuilder.java | 4 +-
.../builder/ResourceAssignmentBuilder.java | 6 +-
.../builder/StateConstraintItemBuilder.java | 2 +-
.../monitoring/mbeans/ResourceMonitor.java | 4 +-
.../participant/HelixCustomCodeRunner.java | 4 +-
.../participant/HelixStateMachineEngine.java | 10 +-
.../helix/tools/ClusterStateVerifier.java | 10 +-
.../org/apache/helix/tools/MessagePoster.java | 6 +-
.../org/apache/helix/util/RebalanceUtil.java | 2 +-
.../org/apache/helix/util/StatusUpdateUtil.java | 2 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 10 +-
.../org/apache/helix/TestHelixTaskHandler.java | 10 +-
.../test/java/org/apache/helix/TestHelper.java | 8 +-
.../java/org/apache/helix/TestZKCallback.java | 10 +-
.../java/org/apache/helix/ZkUnitTestBase.java | 6 +-
.../test/java/org/apache/helix/api/TestId.java | 9 +
.../apache/helix/api/TestNamespacedConfig.java | 2 +
.../org/apache/helix/api/TestNewStages.java | 6 +
.../org/apache/helix/api/TestUpdateConfig.java | 157 +++++
.../context/TestSerializeRebalancerContext.java | 8 +-
.../helix/controller/stages/BaseStageTest.java | 12 +-
.../TestBestPossibleCalcStageCompatibility.java | 10 +-
.../stages/TestBestPossibleStateCalcStage.java | 8 +-
.../stages/TestCompatibilityCheckStage.java | 2 +-
.../TestCurrentStateComputationStage.java | 12 +-
.../stages/TestMessageThrottleStage.java | 6 +-
.../stages/TestMsgSelectionStage.java | 14 +-
.../stages/TestRebalancePipeline.java | 6 +-
.../stages/TestResourceComputationStage.java | 8 +-
.../strategy/TestAutoRebalanceStrategy.java | 71 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 16 +-
.../TestAddStateModelFactoryAfterConnect.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 4 +-
.../TestEnablePartitionDuringDisable.java | 2 +-
.../integration/TestInvalidAutoIdealState.java | 2 +-
.../TestMessagePartitionStateMismatch.java | 10 +-
.../helix/integration/TestMessageThrottle2.java | 2 +-
.../helix/integration/TestMessagingService.java | 4 +-
.../TestParticipantErrorMessage.java | 8 +-
.../helix/integration/TestRenamePartition.java | 2 +-
.../helix/integration/TestSchedulerMessage.java | 4 +-
.../integration/TestSchedulerMsgContraints.java | 4 +-
.../integration/TestSchedulerMsgUsingQueue.java | 4 +-
.../TestSessionExpiryInTransition.java | 2 +-
.../integration/TestStateTransitionTimeout.java | 4 +-
.../helix/integration/TestZkReconnect.java | 8 +-
.../manager/TestParticipantManager.java | 2 +-
.../TestDefaultControllerMsgHandlerFactory.java | 2 +-
.../helix/manager/zk/TestZNRecordSizeLimit.java | 2 +-
.../helix/manager/zk/TestZkHelixAdmin.java | 2 +-
.../helix/messaging/TestAsyncCallback.java | 2 +-
.../helix/messaging/TestAsyncCallbackSvc.java | 4 +-
.../messaging/TestDefaultMessagingService.java | 2 +-
.../handling/TestHelixTaskExecutor.java | 4 +-
.../helix/mock/controller/MockController.java | 6 +-
.../mock/controller/MockControllerProcess.java | 2 +-
.../helix/mock/participant/DummyProcess.java | 2 +-
.../helix/mock/participant/ErrTransition.java | 2 +-
.../StoreAccessOneNodeTransition.java | 2 +-
.../org/apache/helix/model/TestConstraint.java | 2 +-
.../org/apache/helix/model/TestIdealState.java | 6 +-
.../TestDistControllerStateModel.java | 4 +-
.../apache/helix/tools/TestHelixAdminCli.java | 4 +-
.../apache/helix/examples/BootstrapHandler.java | 2 +-
.../apache/helix/examples/DummyParticipant.java | 2 +-
.../apache/helix/examples/NewModelExample.java | 20 +-
.../org/apache/helix/examples/Quickstart.java | 2 +-
.../helix/taskexecution/TaskStateModel.java | 2 +-
183 files changed, 4577 insertions(+), 4201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 2d3966c..40c527a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -31,8 +31,8 @@ import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
-import org.apache.helix.api.MessageId;
-import org.apache.helix.api.SessionId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.SessionId;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.LiveInstance;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index f52d51c..9c0c25e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.helix.api.NamespacedConfig;
+import org.apache.helix.api.config.NamespacedConfig;
/**
* A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 9e71904..3d24498 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -22,6 +22,16 @@ package org.apache.helix.api;
import java.util.Collections;
import java.util.Map;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SpectatorId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.StateModelDefinition;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
deleted file mode 100644
index 6302e33..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ /dev/null
@@ -1,502 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.ClusterConfiguration;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-public class ClusterAccessor {
- private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
-
- private final HelixDataAccessor _accessor;
- private final PropertyKey.Builder _keyBuilder;
- private final ClusterId _clusterId;
-
- public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
- _accessor = accessor;
- _keyBuilder = accessor.keyBuilder();
- _clusterId = clusterId;
- }
-
- /**
- * create a new cluster, fail if it already exists
- * @return true if created, false if creation failed
- */
- public boolean createCluster(ClusterConfig cluster) {
- boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
- if (!created) {
- LOG.error("Cluster already created. Aborting.");
- return false;
- }
- initClusterStructure();
- Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
- for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
- addStateModelDefinitionToCluster(stateModelDef);
- }
- Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
- for (ResourceConfig resource : resources.values()) {
- addResourceToCluster(resource);
- }
- Map<ParticipantId, ParticipantConfig> participants = cluster.getParticipantMap();
- for (ParticipantConfig participant : participants.values()) {
- addParticipantToCluster(participant);
- }
- _accessor.createProperty(_keyBuilder.constraints(), null);
- for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
- _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
- constraints);
- }
- _accessor.createProperty(_keyBuilder.clusterConfig(),
- ClusterConfiguration.from(cluster.getUserConfig()));
- if (cluster.isPaused()) {
- pauseCluster();
- }
-
- return true;
- }
-
- public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
- Cluster cluster = readCluster();
- if (cluster == null) {
- LOG.error("Cluster does not exist, cannot be updated");
- return null;
- }
- ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
- // TODO: persist this
- return config;
- }
-
- /**
- * drop a cluster
- * @return true if the cluster was dropped, false if there was an error
- */
- public boolean dropCluster() {
- LOG.info("Dropping cluster: " + _clusterId);
- List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
- if (liveInstanceNames.size() > 0) {
- LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
- + liveInstanceNames + ", shutdown participants first.");
- return false;
- }
-
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
- + " are running, shutdown leader first.");
- return false;
- }
-
- return _accessor.removeProperty(_keyBuilder.cluster());
- }
-
- /**
- * read entire cluster data
- * @return cluster
- */
- public Cluster readCluster() {
- /**
- * map of instance-id to instance-config
- */
- Map<String, InstanceConfig> instanceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
-
- /**
- * map of resource-id to ideal-state
- */
- Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-
- /**
- * map of instance-id to live-instance
- */
- Map<String, LiveInstance> liveInstanceMap =
- _accessor.getChildValuesMap(_keyBuilder.liveInstances());
-
- /**
- * map of participant-id to map of message-id to message
- */
- Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
- for (String instanceName : liveInstanceMap.keySet()) {
- Map<String, Message> instanceMsgMap =
- _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
- messageMap.put(instanceName, instanceMsgMap);
- }
-
- /**
- * map of participant-id to map of resource-id to current-state
- */
- Map<String, Map<String, CurrentState>> currentStateMap =
- new HashMap<String, Map<String, CurrentState>>();
- for (String participantName : liveInstanceMap.keySet()) {
- LiveInstance liveInstance = liveInstanceMap.get(participantName);
- SessionId sessionId = liveInstance.getSessionId();
- Map<String, CurrentState> instanceCurStateMap =
- _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
- sessionId.stringify()));
-
- currentStateMap.put(participantName, instanceCurStateMap);
- }
-
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
-
- /**
- * map of constraint-type to constraints
- */
- Map<String, ClusterConstraints> constraintMap =
- _accessor.getChildValuesMap(_keyBuilder.constraints());
-
- /**
- * Map of resource id to external view
- */
- Map<String, ExternalView> externalViewMap =
- _accessor.getChildValuesMap(_keyBuilder.externalViews());
-
- /**
- * Map of resource id to user configuration
- */
- Map<String, ResourceConfiguration> resourceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
-
- /**
- * Map of resource id to resource assignment
- */
- Map<String, ResourceAssignment> resourceAssignmentMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
-
- // read all the resources
- Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
- for (String resourceName : idealStateMap.keySet()) {
- ResourceId resourceId = ResourceId.from(resourceName);
- resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
- resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
- externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
- }
-
- // read all the participants
- Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
- for (String participantName : instanceConfigMap.keySet()) {
- InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
- UserConfig userConfig = UserConfig.from(instanceConfig);
- LiveInstance liveInstance = liveInstanceMap.get(participantName);
- Map<String, Message> instanceMsgMap = messageMap.get(participantName);
-
- ParticipantId participantId = ParticipantId.from(participantName);
-
- participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
- instanceConfig, userConfig, liveInstance, instanceMsgMap,
- currentStateMap.get(participantName)));
- }
-
- // read the controllers
- Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
- ControllerId leaderId = null;
- if (leader != null) {
- leaderId = ControllerId.from(leader.getId());
- controllerMap.put(leaderId, new Controller(leaderId, leader, true));
- }
-
- // read the constraints
- Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
- new HashMap<ConstraintType, ClusterConstraints>();
- for (String constraintType : constraintMap.keySet()) {
- clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
- constraintMap.get(constraintType));
- }
-
- // read the pause status
- PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
- boolean isPaused = pauseSignal != null;
-
- ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
- UserConfig userConfig;
- if (clusterUserConfig != null) {
- userConfig = UserConfig.from(clusterUserConfig);
- } else {
- userConfig = new UserConfig(Scope.cluster(_clusterId));
- }
-
- // read the state model definitions
- StateModelDefinitionAccessor stateModelDefAccessor =
- new StateModelDefinitionAccessor(_accessor);
- Map<StateModelDefId, StateModelDefinition> stateModelMap =
- stateModelDefAccessor.readStateModelDefinitions();
-
- // create the cluster snapshot object
- return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, userConfig, isPaused);
- }
-
- /**
- * pause controller of cluster
- */
- public void pauseCluster() {
- _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
- }
-
- /**
- * resume controller of cluster
- */
- public void resumeCluster() {
- _accessor.removeProperty(_keyBuilder.pause());
- }
-
- /**
- * add a resource to cluster
- * @param resource
- * @return true if resource added, false if there was an error
- */
- public boolean addResourceToCluster(ResourceConfig resource) {
- if (!isClusterStructureValid()) {
- LOG.error("Cluster: " + _clusterId + " structure is not valid");
- return false;
- }
- RebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- StateModelDefId stateModelDefId = context.getStateModelDefId();
- if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
- LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
- return false;
- }
-
- ResourceId resourceId = resource.getId();
- if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
- LOG.error("Skip adding resource: " + resourceId
- + ", because resource ideal state already exists in cluster: " + _clusterId);
- return false;
- }
- if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
- LOG.error("Skip adding resource: " + resourceId
- + ", because resource config already exists in cluster: " + _clusterId);
- return false;
- }
-
- // Add resource user config
- if (resource.getUserConfig() != null) {
- ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
- configuration.setType(resource.getType());
- configuration.addNamespacedConfig(resource.getUserConfig());
- configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
- configuration.setBucketSize(resource.getBucketSize());
- configuration.setBatchMessageMode(resource.getBatchMessageMode());
- _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
- }
-
- // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- IdealState idealState =
- ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
- resource.getBatchMessageMode());
- if (idealState != null) {
- _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
- }
- return true;
- }
-
- /**
- * drop a resource from cluster
- * @param resourceId
- * @return true if removal succeeded, false otherwise
- */
- public boolean dropResourceFromCluster(ResourceId resourceId) {
- if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
- LOG.error("Skip removing resource: " + resourceId
- + ", because resource ideal state already removed from cluster: " + _clusterId);
- return false;
- }
- _accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
- _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
- return true;
- }
-
- /**
- * check if cluster structure is valid
- * @return true if valid or false otherwise
- */
- public boolean isClusterStructureValid() {
- List<String> paths = getRequiredPaths();
- BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
- boolean[] existsResults = baseAccessor.exists(paths, 0);
- for (boolean exists : existsResults) {
- if (!exists) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Create empty persistent properties to ensure that there is a valid cluster structure
- */
- private void initClusterStructure() {
- BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
- List<String> paths = getRequiredPaths();
- for (String path : paths) {
- boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
- if (!status && LOG.isDebugEnabled()) {
- LOG.debug(path + " already exists");
- }
- }
- }
-
- /**
- * Get all property paths that must be set for a cluster structure to be valid
- * @return list of paths as strings
- */
- private List<String> getRequiredPaths() {
- List<String> paths = new ArrayList<String>();
- paths.add(_keyBuilder.cluster().getPath());
- paths.add(_keyBuilder.idealStates().getPath());
- paths.add(_keyBuilder.clusterConfigs().getPath());
- paths.add(_keyBuilder.instanceConfigs().getPath());
- paths.add(_keyBuilder.resourceConfigs().getPath());
- paths.add(_keyBuilder.propertyStore().getPath());
- paths.add(_keyBuilder.liveInstances().getPath());
- paths.add(_keyBuilder.instances().getPath());
- paths.add(_keyBuilder.externalViews().getPath());
- paths.add(_keyBuilder.controller().getPath());
- paths.add(_keyBuilder.stateModelDefs().getPath());
- paths.add(_keyBuilder.controllerMessages().getPath());
- paths.add(_keyBuilder.controllerTaskErrors().getPath());
- paths.add(_keyBuilder.controllerTaskStatuses().getPath());
- paths.add(_keyBuilder.controllerLeaderHistory().getPath());
- return paths;
- }
-
- /**
- * add a participant to cluster
- * @param participant
- * @return true if participant added, false otherwise
- */
- public boolean addParticipantToCluster(ParticipantConfig participant) {
- if (!isClusterStructureValid()) {
- LOG.error("Cluster: " + _clusterId + " structure is not valid");
- return false;
- }
-
- ParticipantId participantId = participant.getId();
- if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
- LOG.error("Config for participant: " + participantId + " already exists in cluster: "
- + _clusterId);
- return false;
- }
-
- // add empty root ZNodes
- List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
- createKeys.add(_keyBuilder.messages(participantId.stringify()));
- createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
- createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
- createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
- for (PropertyKey key : createKeys) {
- _accessor.createProperty(key, null);
- }
-
- // add the config
- InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
- instanceConfig.setHostName(participant.getHostName());
- instanceConfig.setPort(Integer.toString(participant.getPort()));
- instanceConfig.setInstanceEnabled(participant.isEnabled());
- UserConfig userConfig = participant.getUserConfig();
- instanceConfig.addNamespacedConfig(userConfig);
- Set<String> tags = participant.getTags();
- for (String tag : tags) {
- instanceConfig.addTag(tag);
- }
- Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
- for (PartitionId partitionId : disabledPartitions) {
- instanceConfig.setInstanceEnabledForPartition(partitionId, false);
- }
- _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
- _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
- return true;
- }
-
- /**
- * drop a participant from cluster
- * @param participantId
- * @return true if participant dropped, false if there was an error
- */
- public boolean dropParticipantFromCluster(ParticipantId participantId) {
- if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
- LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
- + _clusterId);
- return false;
- }
-
- if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
- LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
- + _clusterId);
- return false;
- }
-
- // delete participant config path
- _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-
- // delete participant path
- _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
- return true;
- }
-
- /**
- * Add a state model definition. Updates the existing state model definition if it already exists.
- * @param stateModelDef fully initialized state model definition
- * @return true if the model is persisted, false otherwise
- */
- public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
- if (!isClusterStructureValid()) {
- LOG.error("Cluster: " + _clusterId + " structure is not valid");
- return false;
- }
-
- StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
- return smdAccessor.setStateModelDefinition(stateModelDef);
- }
-
- /**
- * Remove a state model definition if it exists
- * @param stateModelDefId state model definition id
- * @return true if removed, false if it did not exist
- */
- public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
- return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
deleted file mode 100644
index 590fb01..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ /dev/null
@@ -1,704 +0,0 @@
-package org.apache.helix.api;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.Transition;
-import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration properties of a cluster
- */
-public class ClusterConfig {
- private static final Logger LOG = Logger.getLogger(ClusterConfig.class);
-
- private final ClusterId _id;
- private final Map<ResourceId, ResourceConfig> _resourceMap;
- private final Map<ParticipantId, ParticipantConfig> _participantMap;
- private final Map<ConstraintType, ClusterConstraints> _constraintMap;
- private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
- private final UserConfig _userConfig;
- private final boolean _isPaused;
-
- /**
- * Initialize a cluster configuration. Also see ClusterConfig.Builder
- * @param id cluster id
- * @param resourceMap map of resource id to resource config
- * @param participantMap map of participant id to participant config
- * @param constraintMap map of constraint type to all constraints of that type
- * @param stateModelMap map of state model id to state model definition
- * @param userConfig user-defined cluster properties
- * @param isPaused true if paused, false if active
- */
- private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
- Map<ParticipantId, ParticipantConfig> participantMap,
- Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
- boolean isPaused) {
- _id = id;
- _resourceMap = ImmutableMap.copyOf(resourceMap);
- _participantMap = ImmutableMap.copyOf(participantMap);
- _constraintMap = ImmutableMap.copyOf(constraintMap);
- _stateModelMap = ImmutableMap.copyOf(stateModelMap);
- _userConfig = userConfig;
- _isPaused = isPaused;
- }
-
- /**
- * Get cluster id
- * @return cluster id
- */
- public ClusterId getId() {
- return _id;
- }
-
- /**
- * Get resources in the cluster
- * @return a map of resource id to resource, or empty map if none
- */
- public Map<ResourceId, ResourceConfig> getResourceMap() {
- return _resourceMap;
- }
-
- /**
- * Get all the constraints on the cluster
- * @return map of constraint type to constraints
- */
- public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
- return _constraintMap;
- }
-
- /**
- * Get the maximum number of participants that can be in a state
- * @param scope the scope for the bound
- * @param stateModelDefId the state model of the state
- * @param state the constrained state
- * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
- * number of replicas, or "N" for number of participants
- */
- public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state) {
- // set up attributes to match based on the scope
- ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
- Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
- matchAttributes.put(ConstraintAttribute.STATE, state.toString());
- matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for state constraint: " + scope);
- return "-1";
- }
- Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
- int value = -1;
- for (ConstraintItem item : matches) {
- // match: if an R or N is found, always choose that one
- // otherwise, take the minimum of the counts specified in the constraints
- String constraintValue = item.getConstraintValue();
- if (constraintValue != null) {
- if (constraintValue.equals(ConstraintValue.N.toString())
- || constraintValue.equals(ConstraintValue.R.toString())) {
- return constraintValue;
- } else {
- try {
- int current = Integer.parseInt(constraintValue);
- if (value == -1 || current < value) {
- value = current;
- }
- } catch (NumberFormatException e) {
- LOG.error("Invalid state upper bound: " + constraintValue);
- }
- }
- }
- }
- return Integer.toString(value);
- }
-
- /**
- * Get the limit of simultaneous execution of a transition
- * @param scope the scope under which the transition is constrained
- * @param stateModelDefId the state model of which the transition is a part
- * @param transition the constrained transition
- * @return the limit, or Integer.MAX_VALUE if there is no limit
- */
- public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- Transition transition) {
- // set up attributes to match based on the scope
- ClusterConstraints transitionConstraints =
- getConstraintMap().get(ConstraintType.MESSAGE_CONSTRAINT);
- Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
- matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
- matchAttributes.put(ConstraintAttribute.MESSAGE_TYPE, MessageType.STATE_TRANSITION.toString());
- matchAttributes.put(ConstraintAttribute.TRANSITION, transition.toString());
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
- break;
- case PARTICIPANT:
- matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for transition constraints: " + scope);
- return Integer.MAX_VALUE;
- }
- Set<ConstraintItem> matches = transitionConstraints.match(matchAttributes);
- int value = Integer.MAX_VALUE;
- for (ConstraintItem item : matches) {
- String constraintValue = item.getConstraintValue();
- if (constraintValue != null) {
- try {
- int current = Integer.parseInt(constraintValue);
- if (current < value) {
- value = current;
- }
- } catch (NumberFormatException e) {
- LOG.error("Invalid in-flight transition cap: " + constraintValue);
- }
- }
- }
- return value;
- }
-
- /**
- * Get participants of the cluster
- * @return a map of participant id to participant, or empty map if none
- */
- public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
- return _participantMap;
- }
-
- /**
- * Get all the state model definitions on the cluster
- * @return map of state model definition id to state model definition
- */
- public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
- return _stateModelMap;
- }
-
- /**
- * Get user-specified configuration properties of this cluster
- * @return UserConfig properties
- */
- public UserConfig getUserConfig() {
- return _userConfig;
- }
-
- /**
- * Check the paused status of the cluster
- * @return true if paused, false otherwise
- */
- public boolean isPaused() {
- return _isPaused;
- }
-
- /**
- * Update context for a ClusterConfig
- */
- public static class Delta {
- private enum Fields {
- PAUSE_STATUS,
- USER_CONFIG
- }
-
- private Set<Fields> _updateFields;
- private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
- private Builder _builder;
-
- /**
- * Instantiate the delta for a cluster config
- * @param clusterId the cluster to update
- */
- public Delta(ClusterId clusterId) {
- _updateFields = Sets.newHashSet();
- _removedConstraints = Maps.newHashMap();
- for (ConstraintType type : ConstraintType.values()) {
- Set<ConstraintId> constraints = Sets.newHashSet();
- _removedConstraints.put(type, constraints);
- }
- _builder = new Builder(clusterId);
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param upperBound maximum number of replicas per partition in the state
- * @return Delta
- */
- public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, int upperBound) {
- return addStateUpperBoundConstraint(scope, stateModelDefId, state,
- Integer.toString(upperBound));
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
- * number, or the currently supported special bound values:<br />
- * "R" - Refers to the number of replicas specified during resource
- * creation. This allows having different replication factor for each
- * resource without having to create a different state machine. <br />
- * "N" - Refers to all nodes in the cluster. Useful for resources that need
- * to exist on all nodes. This way one can add/remove nodes without having
- * the change the bounds.
- * @return Delta
- */
- public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, String dynamicUpperBound) {
- _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
- return this;
- }
-
- /**
- * Remove state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @return Delta
- */
- public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state) {
- _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
- ConstraintId.from(scope, stateModelDefId, state));
- return this;
- }
-
- /**
- * Add a constraint on the maximum number of in-flight transitions of a certain type
- * @param scope scope of the constraint
- * @param stateModelDefId identifies the state model containing the transition
- * @param transition the transition to constrain
- * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
- * @return Delta
- */
- public Delta addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- Transition transition, int maxInFlightTransitions) {
- _builder.addTransitionConstraint(scope, stateModelDefId, transition, maxInFlightTransitions);
- return this;
- }
-
- /**
- * Remove a constraint on the maximum number of in-flight transitions of a certain type
- * @param scope scope of the constraint
- * @param stateModelDefId identifies the state model containing the transition
- * @param transition the transition to constrain
- * @return Delta
- */
- public Delta removeTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- Transition transition) {
- _removedConstraints.get(ConstraintType.MESSAGE_CONSTRAINT).add(
- ConstraintId.from(scope, stateModelDefId, transition));
- return this;
- }
-
- /**
- * Add a single constraint item
- * @param type type of the constraint item
- * @param constraintId unique constraint id
- * @param item instantiated ConstraintItem
- * @return Delta
- */
- public Delta addConstraintItem(ConstraintType type, ConstraintId constraintId,
- ConstraintItem item) {
- _builder.addConstraint(type, constraintId, item);
- return this;
- }
-
- /**
- * Remove a single constraint item
- * @param type type of the constraint item
- * @param constraintId unique constraint id
- * @return Delta
- */
- public Delta removeConstraintItem(ConstraintType type, ConstraintId constraintId) {
- _removedConstraints.get(type).add(constraintId);
- return this;
- }
-
- /**
- * Set the paused status of the cluster
- * @param isPaused true if paused, false otherwise
- * @return Delta
- */
- public Delta setPausedStatus(boolean isPaused) {
- _builder.pausedStatus(isPaused);
- _updateFields.add(Fields.PAUSE_STATUS);
- return this;
- }
-
- /**
- * Set the user configuration
- * @param userConfig user-specified properties
- * @return Builder
- */
- public Delta setUserConfig(UserConfig userConfig) {
- _builder.userConfig(userConfig);
- _updateFields.add(Fields.USER_CONFIG);
- return this;
- }
-
- /**
- * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
- * @param orig the original ClusterConfig
- * @return updated ClusterConfig
- */
- public ClusterConfig mergeInto(ClusterConfig orig) {
- // copy in original and updated fields
- ClusterConfig deltaConfig = _builder.build();
- Builder builder =
- new Builder(orig.getId()).addResources(orig.getResourceMap().values())
- .addParticipants(orig.getParticipantMap().values())
- .addStateModelDefinitions(orig.getStateModelMap().values())
- .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused());
- for (Fields field : _updateFields) {
- switch (field) {
- case PAUSE_STATUS:
- _builder.pausedStatus(deltaConfig.isPaused());
- break;
- case USER_CONFIG:
- _builder.userConfig(deltaConfig.getUserConfig());
- break;
- }
- }
- // add constraint deltas
- for (ConstraintType type : ConstraintType.values()) {
- ClusterConstraints constraints;
- if (orig.getConstraintMap().containsKey(type)) {
- constraints = orig.getConstraintMap().get(type);
- } else {
- constraints = new ClusterConstraints(type);
- }
- // add new constraints
- if (deltaConfig.getConstraintMap().containsKey(type)) {
- ClusterConstraints deltaConstraints = deltaConfig.getConstraintMap().get(type);
- for (ConstraintId constraintId : deltaConstraints.getConstraintItems().keySet()) {
- ConstraintItem constraintItem = deltaConstraints.getConstraintItem(constraintId);
- constraints.addConstraintItem(constraintId, constraintItem);
- }
- }
- // remove constraints
- for (ConstraintId constraintId : _removedConstraints.get(type)) {
- constraints.removeConstraintItem(constraintId);
- }
- builder.addConstraint(constraints);
- }
- return builder.build();
- }
- }
-
- /**
- * Assembles a cluster configuration
- */
- public static class Builder {
- private final ClusterId _id;
- private final Map<ResourceId, ResourceConfig> _resourceMap;
- private final Map<ParticipantId, ParticipantConfig> _participantMap;
- private final Map<ConstraintType, ClusterConstraints> _constraintMap;
- private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
- private UserConfig _userConfig;
- private boolean _isPaused;
-
- /**
- * Initialize builder for a cluster
- * @param id cluster id
- */
- public Builder(ClusterId id) {
- _id = id;
- _resourceMap = new HashMap<ResourceId, ResourceConfig>();
- _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
- _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
- _stateModelMap = new HashMap<StateModelDefId, StateModelDefinition>();
- _isPaused = false;
- _userConfig = new UserConfig(Scope.cluster(id));
- }
-
- /**
- * Add a resource to the cluster
- * @param resource resource configuration
- * @return Builder
- */
- public Builder addResource(ResourceConfig resource) {
- _resourceMap.put(resource.getId(), resource);
- return this;
- }
-
- /**
- * Add multiple resources to the cluster
- * @param resources resource configurations
- * @return Builder
- */
- public Builder addResources(Collection<ResourceConfig> resources) {
- for (ResourceConfig resource : resources) {
- addResource(resource);
- }
- return this;
- }
-
- /**
- * Add a participant to the cluster
- * @param participant participant configuration
- * @return Builder
- */
- public Builder addParticipant(ParticipantConfig participant) {
- _participantMap.put(participant.getId(), participant);
- return this;
- }
-
- /**
- * Add multiple participants to the cluster
- * @param participants participant configurations
- * @return Builder
- */
- public Builder addParticipants(Collection<ParticipantConfig> participants) {
- for (ParticipantConfig participant : participants) {
- addParticipant(participant);
- }
- return this;
- }
-
- /**
- * Add a constraint to the cluster
- * @param constraint cluster constraint of a specific type
- * @return Builder
- */
- public Builder addConstraint(ClusterConstraints constraint) {
- ClusterConstraints existConstraints = getConstraintsInstance(constraint.getType());
- for (ConstraintId constraintId : constraint.getConstraintItems().keySet()) {
- existConstraints
- .addConstraintItem(constraintId, constraint.getConstraintItem(constraintId));
- }
- return this;
- }
-
- /**
- * Add a single constraint item
- * @param type type of the constraint
- * @param constraintId unique constraint identifier
- * @param item instantiated ConstraintItem
- * @return Builder
- */
- public Builder addConstraint(ConstraintType type, ConstraintId constraintId, ConstraintItem item) {
- ClusterConstraints existConstraints = getConstraintsInstance(type);
- existConstraints.addConstraintItem(constraintId, item);
- return this;
- }
-
- /**
- * Add multiple constraints to the cluster
- * @param constraints cluster constraints of multiple distinct types
- * @return Builder
- */
- public Builder addConstraints(Collection<ClusterConstraints> constraints) {
- for (ClusterConstraints constraint : constraints) {
- addConstraint(constraint);
- }
- return this;
- }
-
- /**
- * Add a constraint on the maximum number of in-flight transitions of a certain type
- * @param scope scope of the constraint
- * @param stateModelDefId identifies the state model containing the transition
- * @param transition the transition to constrain
- * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
- * @return Builder
- */
- public Builder addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- Transition transition, int maxInFlightTransitions) {
- Map<String, String> attributes = Maps.newHashMap();
- attributes.put(ConstraintAttribute.MESSAGE_TYPE.toString(),
- MessageType.STATE_TRANSITION.toString());
- attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(),
- Integer.toString(maxInFlightTransitions));
- attributes.put(ConstraintAttribute.TRANSITION.toString(), transition.toString());
- attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
- break;
- case PARTICIPANT:
- attributes.put(ConstraintAttribute.INSTANCE.toString(), scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for adding a transition constraint: " + scope);
- return this;
- }
- ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
- ClusterConstraints constraints = getConstraintsInstance(ConstraintType.MESSAGE_CONSTRAINT);
- constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, transition), item);
- return this;
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param upperBound maximum number of replicas per partition in the state
- * @return Builder
- */
- public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, int upperBound) {
- return addStateUpperBoundConstraint(scope, stateModelDefId, state,
- Integer.toString(upperBound));
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
- * number, or the currently supported special bound values:<br />
- * "R" - Refers to the number of replicas specified during resource
- * creation. This allows having different replication factor for each
- * resource without having to create a different state machine. <br />
- * "N" - Refers to all nodes in the cluster. Useful for resources that need
- * to exist on all nodes. This way one can add/remove nodes without having
- * the change the bounds.
- * @return Builder
- */
- public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, String dynamicUpperBound) {
- Map<String, String> attributes = Maps.newHashMap();
- attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
- attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
- attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for adding a state constraint: " + scope);
- return this;
- }
- ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
- ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
- constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
- return this;
- }
-
- /**
- * Add a state model definition to the cluster
- * @param stateModelDef state model definition of the cluster
- * @return Builder
- */
- public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
- _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
- // add state constraints from the state model definition
- for (State state : stateModelDef.getStatesPriorityList()) {
- if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
- addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
- state, stateModelDef.getNumParticipantsPerState(state));
- }
- }
- return this;
- }
-
- /**
- * Add multiple state model definitions
- * @param stateModelDefs collection of state model definitions for the cluster
- * @return Builder
- */
- public Builder addStateModelDefinitions(Collection<StateModelDefinition> stateModelDefs) {
- for (StateModelDefinition stateModelDef : stateModelDefs) {
- addStateModelDefinition(stateModelDef);
- }
- return this;
- }
-
- /**
- * Set the paused status of the cluster
- * @param isPaused true if paused, false otherwise
- * @return Builder
- */
- public Builder pausedStatus(boolean isPaused) {
- _isPaused = isPaused;
- return this;
- }
-
- /**
- * Set the user configuration
- * @param userConfig user-specified properties
- * @return Builder
- */
- public Builder userConfig(UserConfig userConfig) {
- _userConfig = userConfig;
- return this;
- }
-
- /**
- * Create the cluster configuration
- * @return ClusterConfig
- */
- public ClusterConfig build() {
- return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
- _userConfig, _isPaused);
- }
-
- /**
- * Get a valid instance of ClusterConstraints for a type
- * @param type the type
- * @return ClusterConstraints
- */
- private ClusterConstraints getConstraintsInstance(ConstraintType type) {
- ClusterConstraints constraints = _constraintMap.get(type);
- if (constraints == null) {
- constraints = new ClusterConstraints(type);
- _constraintMap.put(type, constraints);
- }
- return constraints;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ClusterId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterId.java b/helix-core/src/main/java/org/apache/helix/api/ClusterId.java
deleted file mode 100644
index 4e8b382..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterId.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.api;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Identifies a cluster
- */
-public class ClusterId extends Id {
- @JsonProperty("id")
- final private String _id;
-
- /**
- * Create a cluster id
- * @param id string representation of the id
- */
- @JsonCreator
- public ClusterId(@JsonProperty("id") String id) {
- _id = id;
- }
-
- @Override
- public String stringify() {
- return _id;
- }
-
- /**
- * Get a concrete cluster id for a string name
- * @param clusterId string cluster identifier
- * @return ClusterId
- */
- public static ClusterId from(String clusterId) {
- if (clusterId == null) {
- return null;
- }
- return new ClusterId(clusterId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java b/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
deleted file mode 100644
index 7da6714..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.helix.api;
-
-import org.apache.helix.model.Transition;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Identifies a constraint item on the cluster
- */
-public class ConstraintId extends Id {
- @JsonProperty("id")
- private final String _id;
-
- /**
- * Create a constraint id
- * @param constraintId string representing the constraint id
- */
- @JsonCreator
- public ConstraintId(@JsonProperty("id") String id) {
- _id = id;
- }
-
- @Override
- public String stringify() {
- return _id;
- }
-
- /**
- * Get a constraint id from a string
- * @param constraintId string representing the constraint id
- * @return ConstraintId
- */
- public static ConstraintId from(String constraintId) {
- return new ConstraintId(constraintId);
- }
-
- /**
- * Get a state constraint id based on the state model definition and state
- * @param scope the scope of the constraint
- * @param stateModelDefId the state model
- * @param state the constrained state
- * @return ConstraintId
- */
- public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId, State state) {
- return new ConstraintId(scope + "|" + stateModelDefId + "|" + state);
- }
-
- /**
- * Get a state constraint id based on the state model definition and transition
- * @param scope the scope of the constraint
- * @param stateModelDefId the state model
- * @param transition the constrained transition
- * @return ConstraintId
- */
- public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId,
- Transition transition) {
- return new ConstraintId(scope + "|" + stateModelDefId + "|" + transition);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/Controller.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Controller.java b/helix-core/src/main/java/org/apache/helix/api/Controller.java
index c47d603..33e85ed 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Controller.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Controller.java
@@ -19,6 +19,7 @@ package org.apache.helix.api;
* under the License.
*/
+import org.apache.helix.api.id.ControllerId;
import org.apache.helix.model.LiveInstance;
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
deleted file mode 100644
index b835a4c..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.model.LiveInstance;
-
-public class ControllerAccessor {
- private final HelixDataAccessor _accessor;
- private final PropertyKey.Builder _keyBuilder;
-
- public ControllerAccessor(HelixDataAccessor accessor) {
- _accessor = accessor;
- _keyBuilder = accessor.keyBuilder();
- }
-
- /**
- * Read the leader controller if it is live
- * @return Controller snapshot, or null
- */
- public Controller readLeader() {
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- ControllerId leaderId = ControllerId.from(leader.getId());
- return new Controller(leaderId, leader, true);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/ControllerId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ControllerId.java b/helix-core/src/main/java/org/apache/helix/api/ControllerId.java
deleted file mode 100644
index e55f37a..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ControllerId.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.helix.api;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Identifies Helix nodes that take on the CONTROLLER role
- */
-public class ControllerId extends Id {
- @JsonProperty("id")
- private final String _id;
-
- /**
- * Create a controller id
- * @param id string representation of a controller id
- */
- @JsonCreator
- public ControllerId(@JsonProperty("id") String id) {
- _id = id;
- }
-
- @Override
- public String stringify() {
- return _id;
- }
-
- /**
- * Get a ControllerId from a string
- * @param controllerId string representing the id
- * @return ControllerId
- */
- public static ControllerId from(String controllerId) {
- return new ControllerId(controllerId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/Id.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Id.java b/helix-core/src/main/java/org/apache/helix/api/Id.java
deleted file mode 100644
index ce5d2e4..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/Id.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Generic identifier for Helix constructs
- */
-public abstract class Id implements Comparable<Id> {
- public abstract String stringify();
-
- @Override
- public String toString() {
- return stringify();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that instanceof Id) {
- return this.stringify().equals(((Id) that).stringify());
- } else if (that instanceof String) {
- return this.stringify().equals(that);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return this.stringify().hashCode();
- }
-
- @Override
- public int compareTo(Id that) {
- if (that instanceof Id) {
- return this.stringify().compareTo(that.stringify());
- }
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/MessageId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/MessageId.java b/helix-core/src/main/java/org/apache/helix/api/MessageId.java
deleted file mode 100644
index c5d4002..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/MessageId.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.helix.api;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class MessageId extends Id {
- @JsonProperty("id")
- private final String _id;
-
- /**
- * Create a message id
- * @param id string representation of a message id
- */
- @JsonCreator
- public MessageId(@JsonProperty("id") String id) {
- _id = id;
- }
-
- @Override
- public String stringify() {
- return _id;
- }
-
- /**
- * Get a concrete message id
- * @param messageId string message identifier
- * @return MsgId
- */
- public static MessageId from(String messageId) {
- if (messageId == null) {
- return null;
- }
- return new MessageId(messageId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
deleted file mode 100644
index 675b144..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package org.apache.helix.api;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Generic configuration of Helix components prefixed with a namespace
- */
-public class NamespacedConfig extends ZNRecord {
- private static final char PREFIX_CHAR = '!';
- private final String _prefix;
-
- /**
- * Instantiate a NamespacedConfig. It is intended for use only by entities that can be identified
- * @param scope scope object
- */
- public NamespacedConfig(Scope<?> scope, String prefix) {
- super(scope.getScopedId().stringify());
- _prefix = prefix + PREFIX_CHAR;
- }
-
- /**
- * Instantiate a NamespacedConfig from an existing HelixProperty
- * @param property property wrapping a configuration
- */
- public NamespacedConfig(HelixProperty property, String prefix) {
- super(property.getRecord());
- _prefix = prefix + PREFIX_CHAR;
- filterNonPrefixedFields();
- }
-
- /**
- * Instantiate a NamespacedConfig as a copy of another NamedspacedConfig
- * @param config populated NamespacedConfig
- */
- public NamespacedConfig(NamespacedConfig config) {
- super(config.getId());
- _prefix = config.getPrefix() + PREFIX_CHAR;
- if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
- setRawPayload(config.getRawPayload());
- setPayloadSerializer(config.getPayloadSerializer());
- }
- super.setSimpleFields(config.getPrefixedSimpleFields());
- super.setListFields(config.getPrefixedListFields());
- super.setMapFields(config.getPrefixedMapFields());
- }
-
- @Override
- public void setMapField(String k, Map<String, String> v) {
- super.setMapField(_prefix + k, v);
- }
-
- @Override
- public Map<String, String> getMapField(String k) {
- return super.getMapField(_prefix + k);
- }
-
- @Override
- public void setMapFields(Map<String, Map<String, String>> mapFields) {
- for (String k : mapFields.keySet()) {
- super.setMapField(_prefix + k, mapFields.get(k));
- }
- }
-
- /**
- * Returns an immutable map of map fields
- */
- @Override
- public Map<String, Map<String, String>> getMapFields() {
- return convertToPrefixlessMap(super.getMapFields(), _prefix);
- }
-
- @Override
- public void setListField(String k, List<String> v) {
- super.setListField(_prefix + k, v);
- }
-
- @Override
- public List<String> getListField(String k) {
- return super.getListField(_prefix + k);
- }
-
- @Override
- public void setListFields(Map<String, List<String>> listFields) {
- for (String k : listFields.keySet()) {
- super.setListField(_prefix + k, listFields.get(k));
- }
- }
-
- /**
- * Returns an immutable map of list fields
- */
- @Override
- public Map<String, List<String>> getListFields() {
- return convertToPrefixlessMap(super.getListFields(), _prefix);
- }
-
- @Override
- public void setSimpleField(String k, String v) {
- super.setSimpleField(_prefix + k, v);
- }
-
- @Override
- public String getSimpleField(String k) {
- return super.getSimpleField(_prefix + k);
- }
-
- @Override
- public void setSimpleFields(Map<String, String> simpleFields) {
- for (String k : simpleFields.keySet()) {
- super.setSimpleField(_prefix + k, simpleFields.get(k));
- }
- }
-
- /**
- * Returns an immutable map of simple fields
- */
- @Override
- public Map<String, String> getSimpleFields() {
- return convertToPrefixlessMap(super.getSimpleFields(), _prefix);
- }
-
- /**
- * Get the prefix used to distinguish these config properties
- * @return string prefix, not including the underscore
- */
- public String getPrefix() {
- return _prefix.substring(0, _prefix.indexOf(PREFIX_CHAR));
- }
-
- /**
- * Remove all fields from this config that are not prefixed
- */
- private void filterNonPrefixedFields() {
- // filter out any configuration that isn't user-defined
- Predicate<String> keyFilter = new Predicate<String>() {
- @Override
- public boolean apply(String key) {
- return key.contains(_prefix);
- }
- };
- super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
- super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
- super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
- }
-
- /**
- * Get all map fields with prefixed keys
- * @return prefixed map fields
- */
- private Map<String, Map<String, String>> getPrefixedMapFields() {
- return super.getMapFields();
- }
-
- /**
- * Get all list fields with prefixed keys
- * @return prefixed list fields
- */
- private Map<String, List<String>> getPrefixedListFields() {
- return super.getListFields();
- }
-
- /**
- * Get all simple fields with prefixed keys
- * @return prefixed simple fields
- */
- private Map<String, String> getPrefixedSimpleFields() {
- return super.getSimpleFields();
- }
-
- /**
- * Add user configuration to an existing helix property.
- * @param property the property to update
- * @param config the user config
- */
- public static void addConfigToProperty(HelixProperty property, NamespacedConfig config) {
- ZNRecord record = property.getRecord();
- record.getMapFields().putAll(config.getPrefixedMapFields());
- record.getListFields().putAll(config.getPrefixedListFields());
- record.getSimpleFields().putAll(config.getPrefixedSimpleFields());
- if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
- record.setPayloadSerializer(config.getPayloadSerializer());
- record.setRawPayload(config.getRawPayload());
- }
- }
-
- /**
- * Get a copy of a map with the key prefix stripped. The resulting map is immutable
- * @param rawMap map of key, value pairs where the key is prefixed
- * @return map of key, value pairs where the key is not prefixed
- */
- private static <T> Map<String, T> convertToPrefixlessMap(Map<String, T> rawMap, String prefix) {
- Map<String, T> convertedMap = new HashMap<String, T>();
- for (String rawKey : rawMap.keySet()) {
- String k = rawKey.substring(prefix.length());
- convertedMap.put(k, rawMap.get(rawKey));
- }
- return ImmutableMap.copyOf(convertedMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 8b02f0e..0e0de9d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -22,6 +22,12 @@ package org.apache.helix.api;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;