You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:48 UTC
[40/53] [abbrv] git commit: [HELIX-209] Backward compatible function
naming in the model package
[HELIX-209] Backward compatible function naming in the model package
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/142c9248
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/142c9248
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/142c9248
Branch: refs/heads/master
Commit: 142c924850763695c6c5eeb0e337ea927561287e
Parents: e707a60
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 15 16:24:26 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:37 2013 -0800
----------------------------------------------------------------------
.../resources/SchedulerTasksResource.java | 10 +-
.../helix/tools/TestResetPartitionState.java | 6 +-
.../org/apache/helix/agent/AgentStateModel.java | 8 +-
.../java/org/apache/helix/api/Controller.java | 2 +-
.../java/org/apache/helix/api/Resource.java | 2 +-
.../helix/api/accessor/ClusterAccessor.java | 4 +-
.../helix/api/accessor/ParticipantAccessor.java | 16 +-
.../helix/api/accessor/ResourceAccessor.java | 2 +-
.../apache/helix/api/config/ClusterConfig.java | 2 +-
.../controller/GenericHelixController.java | 2 +-
.../controller/rebalancer/AutoRebalancer.java | 2 +-
.../controller/rebalancer/CustomRebalancer.java | 2 +-
.../rebalancer/context/CustomRebalancer.java | 2 +-
.../context/CustomRebalancerContext.java | 2 +-
.../context/PartitionedRebalancerContext.java | 8 +-
.../context/SemiAutoRebalancerContext.java | 2 +-
.../util/ConstraintBasedAssignment.java | 6 +-
.../util/NewConstraintBasedAssignment.java | 8 +-
.../controller/stages/ClusterDataCache.java | 4 +-
.../stages/CompatibilityCheckStage.java | 2 +-
.../stages/CurrentStateComputationStage.java | 12 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/MessageGenerationPhase.java | 7 +-
.../stages/MessageSelectionStage.java | 12 +-
.../stages/NewCurrentStateComputationStage.java | 10 +-
.../stages/NewExternalViewComputeStage.java | 4 +-
.../stages/NewMessageGenerationStage.java | 4 +-
.../stages/NewMessageSelectionStage.java | 12 +-
.../stages/NewResourceComputationStage.java | 6 +-
.../stages/NewTaskAssignmentStage.java | 8 +-
.../stages/RebalanceIdealStateStage.java | 2 +-
.../stages/ResourceComputationStage.java | 10 +-
.../controller/stages/TaskAssignmentStage.java | 10 +-
.../helix/manager/zk/ControllerManager.java | 2 +-
.../manager/zk/CurStateCarryOverUpdater.java | 2 +-
.../DefaultControllerMessageHandlerFactory.java | 8 +-
...ltParticipantErrorMessageHandlerFactory.java | 4 +-
.../DefaultSchedulerMessageHandlerFactory.java | 38 +--
.../zk/DistributedControllerManager.java | 2 +-
.../manager/zk/DistributedLeaderElection.java | 2 +-
.../manager/zk/ParticipantManagerHelper.java | 11 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 10 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 2 +-
.../apache/helix/messaging/AsyncCallback.java | 2 +-
.../messaging/DefaultMessagingService.java | 4 +-
.../handling/AsyncCallbackService.java | 12 +-
.../handling/HelixStateTransitionHandler.java | 24 +-
.../helix/messaging/handling/HelixTask.java | 26 +-
.../messaging/handling/HelixTaskExecutor.java | 12 +-
.../messaging/handling/MessageTimeoutTask.java | 2 +-
.../org/apache/helix/model/AlertStatus.java | 19 ++
.../java/org/apache/helix/model/Alerts.java | 19 ++
.../apache/helix/model/ClusterConstraints.java | 34 ++-
.../org/apache/helix/model/CurrentState.java | 41 +++-
.../org/apache/helix/model/ExternalView.java | 6 +-
.../java/org/apache/helix/model/IdealState.java | 23 +-
.../org/apache/helix/model/InstanceConfig.java | 2 +-
.../org/apache/helix/model/LiveInstance.java | 28 ++-
.../java/org/apache/helix/model/Message.java | 243 ++++++++++++++++---
.../apache/helix/model/ResourceAssignment.java | 44 +++-
.../helix/model/StateModelDefinition.java | 107 +++++++-
.../java/org/apache/helix/model/Transition.java | 29 ++-
.../builder/StateTransitionTableBuilder.java | 4 +-
.../monitoring/mbeans/ResourceMonitor.java | 4 +-
.../DistClusterControllerElection.java | 2 +-
.../participant/HelixStateMachineEngine.java | 6 +-
.../helix/spectator/RoutingTableProvider.java | 2 +-
.../helix/tools/ClusterStateVerifier.java | 4 +-
.../org/apache/helix/tools/MessagePoster.java | 2 +-
.../org/apache/helix/tools/ZkLogAnalyzer.java | 12 +-
.../org/apache/helix/util/RebalanceUtil.java | 8 +-
.../org/apache/helix/util/StatusUpdateUtil.java | 26 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 2 +-
.../org/apache/helix/TestHelixTaskHandler.java | 4 +-
.../java/org/apache/helix/ZkUnitTestBase.java | 2 +-
.../org/apache/helix/api/TestNewStages.java | 6 +-
.../stages/TestMsgSelectionStage.java | 2 +-
.../stages/TestRebalancePipeline.java | 20 +-
.../stages/TestResourceComputationStage.java | 2 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 2 +-
.../helix/healthcheck/TestAddDropAlert.java | 4 +-
.../helix/healthcheck/TestExpandAlert.java | 4 +-
.../helix/healthcheck/TestSimpleAlert.java | 4 +-
.../healthcheck/TestSimpleWildcardAlert.java | 4 +-
.../helix/healthcheck/TestStalenessAlert.java | 4 +-
.../helix/healthcheck/TestWildcardAlert.java | 4 +-
.../helix/integration/TestAutoRebalance.java | 2 +-
.../TestAutoRebalancePartitionLimit.java | 12 +-
.../integration/TestCleanupExternalView.java | 4 +-
.../TestCustomizedIdealStateRebalancer.java | 10 +-
.../org/apache/helix/integration/TestDrop.java | 2 +-
.../TestEnablePartitionDuringDisable.java | 4 +-
.../helix/integration/TestHelixInstanceTag.java | 4 +-
.../TestMessagePartitionStateMismatch.java | 6 +-
.../helix/integration/TestMessagingService.java | 18 +-
.../integration/TestResetPartitionState.java | 6 +-
.../helix/integration/TestSchedulerMessage.java | 18 +-
.../integration/TestSchedulerMsgContraints.java | 2 +-
.../integration/TestSchedulerMsgUsingQueue.java | 6 +-
.../integration/TestStateTransitionTimeout.java | 4 +-
.../helix/integration/TestStatusUpdate.java | 2 +-
.../helix/manager/zk/TestZkClusterManager.java | 10 +-
.../helix/messaging/TestAsyncCallbackSvc.java | 10 +-
.../handling/TestHelixTaskExecutor.java | 8 +-
.../helix/mock/controller/MockController.java | 2 +-
.../helix/mock/participant/ErrTransition.java | 4 +-
.../apache/helix/tools/TestHelixAdminCli.java | 6 +-
.../examples/MasterSlaveStateModelFactory.java | 16 +-
.../org/apache/helix/examples/Quickstart.java | 2 +-
.../helix/lockmanager/LockManagerDemo.java | 2 +-
.../helix/filestore/FileStoreStateModel.java | 24 +-
.../org/apache/helix/taskexecution/Task.java | 2 +-
112 files changed, 850 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 40c527a..942566c 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -159,7 +159,7 @@ public class SchedulerTasksResource extends Resource {
schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
- schedulerMessage.setTgtSessionId(SessionId.from(leader.getSessionId().stringify()));
+ schedulerMessage.setTgtSessionId(SessionId.from(leader.getTypedSessionId().stringify()));
schedulerMessage.setTgtName("CONTROLLER");
schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
String taskQueueName =
@@ -169,22 +169,22 @@ public class SchedulerTasksResource extends Resource {
DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
}
accessor.setProperty(
- accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId().stringify()),
+ accessor.keyBuilder().controllerMessage(schedulerMessage.getMessageId().stringify()),
schedulerMessage);
Map<String, String> resultMap = new HashMap<String, String>();
resultMap.put("StatusUpdatePath", PropertyPathConfig.getPath(
PropertyType.STATUSUPDATES_CONTROLLER, clusterName, MessageType.SCHEDULER_MSG.toString(),
- schedulerMessage.getMsgId().stringify()));
+ schedulerMessage.getMessageId().stringify()));
resultMap.put("MessageType", Message.MessageType.SCHEDULER_MSG.toString());
- resultMap.put("MsgId", schedulerMessage.getMsgId().stringify());
+ resultMap.put("MsgId", schedulerMessage.getMessageId().stringify());
// Assemble the rest URL for task status update
String ipAddress = InetAddress.getLocalHost().getCanonicalHostName();
String url =
"http://" + ipAddress + ":" + getContext().getAttributes().get(RestAdminApplication.PORT)
+ "/clusters/" + clusterName + "/Controller/statusUpdates/SCHEDULER_MSG/"
- + schedulerMessage.getMsgId();
+ + schedulerMessage.getMessageId();
resultMap.put("statusUpdateUrl", url);
getResponse().setEntity(ClusterRepresentationUtil.ObjectToJson(resultMap),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
index ec52151..9b07445 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
@@ -68,8 +68,8 @@ public class TestResetPartitionState extends AdminTestBase {
@Override
public void doTransition(Message message, NotificationContext context) {
super.doTransition(message, context);
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
if (fromState.toString().equals("ERROR") && toState.toString().equals("OFFLINE")) {
// System.err.println("doReset() invoked");
_errToOfflineInvoked.incrementAndGet();
@@ -192,7 +192,7 @@ public class TestResetPartitionState extends AdminTestBase {
Builder keyBuilder = accessor.keyBuilder();
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
- accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getSessionId()
+ accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getTypedSessionId()
.stringify(), resource, partition));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
index b0106a0..d227ac3 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
@@ -72,8 +72,8 @@ public class AgentStateModel extends StateModel {
HelixManager manager = context.getManager();
String clusterName = manager.getClusterName();
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
// construct keys for command-config
String cmdKey = buildKey(fromState.toString(), toState.toString(), CommandAttribute.COMMAND);
@@ -116,8 +116,8 @@ public class AgentStateModel extends StateModel {
}
if (cmd == null) {
- throw new Exception("Unable to find command for transition from:" + message.getFromState()
- + " to:" + message.getToState());
+ throw new Exception("Unable to find command for transition from:" + message.getTypedFromState()
+ + " to:" + message.getTypedToState());
}
_logger.info("Executing command: " + cmd + ", using workingDir: " + workingDir + ", timeout: "
+ timeout + ", on " + manager.getInstanceName());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/Controller.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Controller.java b/helix-core/src/main/java/org/apache/helix/api/Controller.java
index 33e85ed..1218287 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Controller.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Controller.java
@@ -39,7 +39,7 @@ public class Controller {
if (liveInstance != null) {
_runningInstance =
- new RunningInstance(liveInstance.getSessionId(), liveInstance.getHelixVersion(),
+ new RunningInstance(liveInstance.getTypedSessionId(), liveInstance.getTypedHelixVersion(),
liveInstance.getProcessId());
} else {
_runningInstance = null;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 589c8a6..79a1e09 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -96,7 +96,7 @@ public class Resource {
Map<PartitionId, Message> innerMsgMap = new HashMap<PartitionId, Message>();
if (idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue)) {
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
// TODO refactor: scheduler-task-queue state model uses map-field to store inner-messages
// this is different from all other state-models
Map<String, String> innerMsgStrMap =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index abeb649..85b8432 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -372,7 +372,7 @@ public class ClusterAccessor {
new HashMap<String, Map<String, CurrentState>>();
for (String participantName : liveInstanceMap.keySet()) {
LiveInstance liveInstance = liveInstanceMap.get(participantName);
- SessionId sessionId = liveInstance.getSessionId();
+ SessionId sessionId = liveInstance.getTypedSessionId();
Map<String, CurrentState> instanceCurStateMap =
_accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
sessionId.stringify()));
@@ -794,7 +794,7 @@ public class ClusterAccessor {
}
Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
for (PartitionId partitionId : disabledPartitions) {
- instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+ instanceConfig.setParticipantEnabledForPartition(partitionId, false);
}
_accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index ac8f79d..83dd53e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -277,7 +277,7 @@ public class ParticipantAccessor {
for (ParticipantId participantId : resetParticipantIdSet) {
for (ExternalView extView : extViews) {
Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
- for (PartitionId partitionId : extView.getPartitionSet()) {
+ for (PartitionId partitionId : extView.getPartitionIdSet()) {
Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
if (stateMap.containsKey(participantId)
&& stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
@@ -346,7 +346,7 @@ public class ParticipantAccessor {
// make sure that there are no pending transition messages
for (Message message : participant.getMessageMap().values()) {
if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
- || !runningInstance.getSessionId().equals(message.getTgtSessionId())
+ || !runningInstance.getSessionId().equals(message.getTypedTgtSessionId())
|| !resourceId.equals(message.getResourceId())
|| !resetPartitionIdSet.contains(message.getPartitionId())) {
continue;
@@ -384,10 +384,10 @@ public class ParticipantAccessor {
message.setTgtSessionId(runningInstance.getSessionId());
message.setStateModelDef(stateModelDefId);
message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
- message.setToState(stateModelDef.getInitialState());
+ message.setToState(stateModelDef.getTypedInitialState());
message.setStateModelFactoryId(context.getStateModelFactoryId());
- messageMap.put(message.getMsgId(), message);
+ messageMap.put(message.getMessageId(), message);
}
// send the messages
@@ -477,7 +477,7 @@ public class ParticipantAccessor {
instanceConfig.addTag(tag);
}
for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
- instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+ instanceConfig.setParticipantEnabledForPartition(partitionId, false);
}
instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
@@ -528,7 +528,7 @@ public class ParticipantAccessor {
RunningInstance runningInstance = null;
if (liveInstance != null) {
runningInstance =
- new RunningInstance(liveInstance.getSessionId(), liveInstance.getHelixVersion(),
+ new RunningInstance(liveInstance.getTypedSessionId(), liveInstance.getTypedHelixVersion(),
liveInstance.getProcessId());
}
@@ -574,7 +574,7 @@ public class ParticipantAccessor {
Map<String, Message> instanceMsgMap = Collections.emptyMap();
Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
if (liveInstance != null) {
- SessionId sessionId = liveInstance.getSessionId();
+ SessionId sessionId = liveInstance.getTypedSessionId();
instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
instanceCurStateMap =
@@ -681,7 +681,7 @@ public class ParticipantAccessor {
*/
protected void swapParticipantsInIdealState(IdealState idealState,
ParticipantId oldParticipantId, ParticipantId newParticipantId) {
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
if (oldPreferenceList != null) {
List<ParticipantId> newPreferenceList = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 58b226d..517c8c4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -292,7 +292,7 @@ public class ResourceAccessor {
}
Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
- for (PartitionId partitionId : extView.getPartitionSet()) {
+ for (PartitionId partitionId : extView.getPartitionIdSet()) {
Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
for (ParticipantId participantId : stateMap.keySet()) {
State state = stateMap.get(participantId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index ed9750a..22a1528 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -767,7 +767,7 @@ public class ClusterConfig {
public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
_stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
// add state constraints from the state model definition
- for (State state : stateModelDef.getStatesPriorityList()) {
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
state, stateModelDef.getNumParticipantsPerState(state));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3c118ce..b27398d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -469,7 +469,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
for (LiveInstance liveInstance : liveInstances) {
curInstances.put(liveInstance.getInstanceName(), liveInstance);
- curSessions.put(liveInstance.getSessionId().stringify(), liveInstance);
+ curSessions.put(liveInstance.getTypedSessionId().stringify(), liveInstance);
}
Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 6d65009..880f2c0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -74,7 +74,7 @@ public class AutoRebalancer implements Rebalancer {
public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
// Compute a preference list based on the current ideal state
- List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionStringSet());
+ List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
String stateModelName = currentIdealState.getStateModelDefRef();
StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 512af80..f6ea60f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -108,7 +108,7 @@ public class CustomRebalancer implements Rebalancer {
HelixDefinedState.ERROR.toString()))
&& disabledInstancesForPartition.contains(instance)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialStateString());
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
index d245fae..00219af 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
@@ -97,7 +97,7 @@ public class CustomRebalancer implements Rebalancer {
participantId).equals(State.from(HelixDefinedState.ERROR)))
&& disabledParticipantsForPartition.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getInitialState());
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
index 904907e..6e1485b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -93,7 +93,7 @@ public class CustomRebalancerContext extends PartitionedRebalancerContext {
Set<ParticipantId> participantSet) {
// compute default upper bounds
Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getStatesPriorityList()) {
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index decc78d..768e40c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -180,7 +180,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
case SEMI_AUTO:
SemiAutoRebalancerContext.Builder semiAutoBuilder =
new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
}
populateContext(semiAutoBuilder, idealState);
@@ -189,7 +189,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
case CUSTOMIZED:
CustomRebalancerContext.Builder customBuilder =
new CustomRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
}
populateContext(customBuilder, idealState);
@@ -219,11 +219,11 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
} else {
replicaCount = Integer.parseInt(replicas);
}
- if (idealState.getNumPartitions() > 0 && idealState.getPartitionSet().size() == 0) {
+ if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
// backwards compatibility: partition sets were based on pref lists/maps previously
builder.addPartitions(idealState.getNumPartitions());
} else {
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
builder.addPartition(new Partition(partitionId));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
index 71b5076..72b3bc7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -96,7 +96,7 @@ public final class SemiAutoRebalancerContext extends PartitionedRebalancerContex
Set<ParticipantId> participantSet) {
// compute default upper bounds
Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getStatesPriorityList()) {
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index dce0a07..323be34 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -87,7 +87,7 @@ public class ConstraintBasedAssignment {
HelixDefinedState.ERROR.toString()))
&& disabledInstancesForPartition.contains(instance)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialStateString());
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
}
}
}
@@ -97,7 +97,7 @@ public class ConstraintBasedAssignment {
return instanceStateMap;
}
- List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
boolean assigned[] = new boolean[instancePreferenceList.size()];
Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
@@ -152,7 +152,7 @@ public class ConstraintBasedAssignment {
public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
int liveNodesNb, int totalReplicas) {
LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
int replicas = totalReplicas;
for (String state : statesPriorityList) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index d5531b1..f703073 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -100,7 +100,7 @@ public class NewConstraintBasedAssignment {
public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
ResourceId resourceId, ClusterConfig cluster) {
Map<State, String> stateMap = Maps.newHashMap();
- for (State state : stateModelDef.getStatesPriorityList()) {
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
String num =
cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
stateModelDef.getStateModelDefId(), state);
@@ -138,7 +138,7 @@ public class NewConstraintBasedAssignment {
participantId).equals(State.from(HelixDefinedState.ERROR)))
&& disabledParticipantsForPartition.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getInitialState());
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
}
}
@@ -148,7 +148,7 @@ public class NewConstraintBasedAssignment {
return participantStateMap;
}
- List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
boolean assigned[] = new boolean[participantPreferenceList.size()];
for (State state : statesPriorityList) {
@@ -204,7 +204,7 @@ public class NewConstraintBasedAssignment {
public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
- List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
int replicas = totalReplicas;
for (State state : statesPriorityList) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2c4d8e1..ac1cef4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -73,7 +73,7 @@ public class ClusterDataCache {
_liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
for (LiveInstance instance : _liveInstanceMap.values()) {
- LOG.trace("live instance: " + instance.getParticipantId() + " " + instance.getSessionId());
+ LOG.trace("live instance: " + instance.getParticipantId() + " " + instance.getTypedSessionId());
}
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
@@ -91,7 +91,7 @@ public class ClusterDataCache {
new HashMap<String, Map<String, Map<String, CurrentState>>>();
for (String instanceName : _liveInstanceMap.keySet()) {
LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
- String sessionId = liveInstance.getSessionId().stringify();
+ String sessionId = liveInstance.getTypedSessionId().stringify();
if (!allCurStateMap.containsKey(instanceName)) {
allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 50adf97..64e881c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -47,7 +47,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
HelixManagerProperties properties = manager.getProperties();
Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
for (LiveInstance liveInstance : liveInstanceMap.values()) {
- HelixVersion version = liveInstance.getHelixVersion();
+ HelixVersion version = liveInstance.getTypedHelixVersion();
String participantVersion = (version != null) ? version.toString() : null;
if (!properties.isParticipantCompatible(participantVersion)) {
String errorMsg =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 3f5682e..7036512 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -60,7 +60,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
continue;
}
- if (!instance.getSessionId().equals(message.getTgtSessionId())) {
+ if (!instance.getTypedSessionId().equals(message.getTypedTgtSessionId())) {
continue;
}
ResourceId resourceId = message.getResourceId();
@@ -74,7 +74,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
Partition partition = resource.getPartition(partitionId.stringify());
if (partition != null) {
currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
- message.getToState().toString());
+ message.getTypedToState().toString());
} else {
// log
}
@@ -85,7 +85,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
Partition partition = resource.getPartition(partitionId.stringify());
if (partition != null) {
currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
- message.getToState().toString());
+ message.getTypedToState().toString());
} else {
// log
}
@@ -97,12 +97,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
for (LiveInstance instance : liveInstances.values()) {
String instanceName = instance.getInstanceName();
- String clientSessionId = instance.getSessionId().stringify();
+ String clientSessionId = instance.getTypedSessionId().stringify();
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName, clientSessionId);
for (CurrentState currentState : currentStateMap.values()) {
- if (!instance.getSessionId().equals(currentState.getSessionId())) {
+ if (!instance.getTypedSessionId().equals(currentState.getTypedSessionId())) {
continue;
}
String resourceName = currentState.getResourceName();
@@ -117,7 +117,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
- Map<String, String> partitionStateMap = currentState.getPartitionStateStringMap();
+ Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
for (String partitionName : partitionStateMap.keySet()) {
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 9e16568..3c3a9d9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -168,7 +168,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
Builder keyBuilder = accessor.keyBuilder();
- for (String taskPartitionName : ev.getPartitionStringSet()) {
+ for (String taskPartitionName : ev.getPartitionSet()) {
for (String taskState : ev.getStateMap(taskPartitionName).values()) {
if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
|| taskState.equalsIgnoreCase("COMPLETED")) {
@@ -193,7 +193,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
}
}
// fill the controllerMsgIdCountMap
- for (String taskId : taskQueueIdealState.getPartitionStringSet()) {
+ for (String taskId : taskQueueIdealState.getPartitionSet()) {
String controllerMsgId =
taskQueueIdealState.getRecord().getMapField(taskId)
.get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 9c24bd6..3056cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -71,7 +71,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
Map<String, String> sessionIdMap = new HashMap<String, String>();
for (LiveInstance liveInstance : liveInstances.values()) {
- sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId().stringify());
+ sessionIdMap
+ .put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId().stringify());
}
MessageGenerationOutput output = new MessageGenerationOutput();
@@ -96,7 +97,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
String currentState =
currentStateOutput.getCurrentState(resourceName, partition, instanceName);
if (currentState == null) {
- currentState = stateModelDef.getInitialStateString();
+ currentState = stateModelDef.getInitialState();
}
if (desiredState.equalsIgnoreCase(currentState)) {
@@ -177,7 +178,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
}
// add generated messages to output according to state priority
- List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
for (String state : statesPriorityList) {
if (messageMap.containsKey(state)) {
for (Message message : messageMap.get(state)) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 9e430b7..1a3f37b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -106,7 +106,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
selectMessages(cache.getLiveInstances(),
currentStateOutput.getCurrentStateMap(resourceName, partition),
currentStateOutput.getPendingStateMap(resourceName, partition), messages,
- stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
+ stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
output.addMessages(resourceName, partition, selectedMessages);
}
}
@@ -171,8 +171,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
new TreeMap<Integer, List<Message>>();
for (Message message : messages) {
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
String transition = fromState + "-" + toState;
int priority = Integer.MAX_VALUE;
@@ -189,8 +189,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
// select messages
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
for (Message message : messageList) {
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
if (!bounds.containsKey(fromState)) {
LOG.error("Message's fromState is not in currentState. message: " + message);
@@ -241,7 +241,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
IdealState idealState, ClusterDataCache cache) {
Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
+ List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
for (String state : statePriorityList) {
String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
int max = -1;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index f1c9323..f7f2a5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -68,7 +68,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
continue;
}
- if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTgtSessionId())) {
+ if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
continue;
}
@@ -83,7 +83,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
currentStateOutput.setPendingState(resourceId, partitionId, participantId,
- message.getToState());
+ message.getTypedToState());
} else {
// log
}
@@ -94,7 +94,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
currentStateOutput.setPendingState(resourceId, partitionId, participantId,
- message.getToState());
+ message.getTypedToState());
} else {
// log
}
@@ -107,7 +107,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
for (CurrentState curState : curStateMap.values()) {
- if (!sessionId.equals(curState.getSessionId())) {
+ if (!sessionId.equals(curState.getTypedSessionId())) {
continue;
}
@@ -124,7 +124,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
- Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
+ Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
for (PartitionId partitionId : partitionStateMap.keySet()) {
Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 8ff52bd..d67931d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -184,7 +184,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
Map<String, Map<String, String>> controllerMsgUpdates =
new HashMap<String, Map<String, String>>();
- for (String taskPartitionName : ev.getPartitionStringSet()) {
+ for (String taskPartitionName : ev.getPartitionSet()) {
for (String taskState : ev.getStateMap(taskPartitionName).values()) {
if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
|| taskState.equalsIgnoreCase("COMPLETED")) {
@@ -235,7 +235,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
controllerStatusUpdate.getRecord().setMapField(
"MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
- + innerMessage.getMsgId(), result);
+ + innerMessage.getMessageId(), result);
}
// All done for the scheduled tasks that came from controllerMsgId, add summary for it
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index fb6dfe8..3d51bd0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -96,7 +96,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
State currentState =
currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
if (currentState == null) {
- currentState = stateModelDef.getInitialState();
+ currentState = stateModelDef.getTypedInitialState();
}
if (desiredState.equals(currentState)) {
@@ -174,7 +174,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
}
// add generated messages to output according to state priority
- List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
for (State state : statesPriorityList) {
if (messageMap.containsKey(state)) {
for (Message message : messageMap.get(state)) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 4adfbcb..4a46a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -126,7 +126,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
selectMessages(cluster.getLiveParticipantMap(),
currentStateOutput.getCurrentStateMap(resourceId, partitionId),
currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
- stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
+ stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
output.setMessages(resourceId, partitionId, selectedMessages);
}
}
@@ -191,8 +191,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
new TreeMap<Integer, List<Message>>();
for (Message message : messages) {
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
String transition = fromState.toString() + "-" + toState.toString();
int priority = Integer.MAX_VALUE;
@@ -209,8 +209,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
// select messages
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
for (Message message : messageList) {
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
if (!bounds.containsKey(fromState)) {
LOG.error("Message's fromState is not in currentState. message: " + message);
@@ -268,7 +268,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
.getRebalancerContext(ReplicatedRebalancerContext.class) : null;
Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
- List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
+ List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
for (State state : statePriorityList) {
String numInstancesPerState =
cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index e839a98..b531bd7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -98,8 +98,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
if (currentState.getStateModelDefRef() == null) {
LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
- + ", partitions: " + currentState.getPartitionStateStringMap().keySet()
- + ", states: " + currentState.getPartitionStateStringMap().values());
+ + ", partitions: " + currentState.getPartitionStateMap().keySet()
+ + ", states: " + currentState.getPartitionStateMap().values());
throw new StageException("State model def is null for resource:"
+ currentState.getResourceId());
}
@@ -119,7 +119,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
}
PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+ for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
rebCtxBuilder.addPartition(new Partition(partitionId));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index d8fa81c..51c9284 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -109,9 +109,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
}
String key =
- keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId().stringify(),
+ keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
message.getResourceId().stringify()).getPath()
- + "/" + message.getFromState() + "/" + message.getToState();
+ + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
if (!batchMessages.containsKey(key)) {
Message batchMessage = new Message(message.getRecord());
@@ -134,9 +134,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
List<PropertyKey> keys = new ArrayList<PropertyKey>();
for (Message message : messages) {
- logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+ logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
+ " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
- + message.getFromState() + " to:" + message.getToState());
+ + message.getTypedFromState() + " to:" + message.getTypedToState());
// System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
// + message.getTgtName() + " transit " + message.getPartitionId() + "|"
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index ffc14d6..949cfca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -62,7 +62,7 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
(Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
balancer.init(manager);
Resource resource = new Resource(resourceName);
- for (String partitionName : currentIdealState.getPartitionStringSet()) {
+ for (String partitionName : currentIdealState.getPartitionSet()) {
resource.addPartition(partitionName);
}
ResourceAssignment resourceAssignment =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index cb2a7ed..da38ee2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -54,7 +54,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
if (idealStates != null && idealStates.size() > 0) {
for (IdealState idealState : idealStates.values()) {
- Set<String> partitionSet = idealState.getPartitionStringSet();
+ Set<String> partitionSet = idealState.getPartitionSet();
String resourceName = idealState.getResourceName();
for (String partition : partitionSet) {
@@ -75,7 +75,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
if (availableInstances != null && availableInstances.size() > 0) {
for (LiveInstance instance : availableInstances.values()) {
String instanceName = instance.getInstanceName();
- String clientSessionId = instance.getSessionId().stringify();
+ String clientSessionId = instance.getTypedSessionId().stringify();
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName, clientSessionId);
@@ -85,7 +85,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
for (CurrentState currentState : currentStateMap.values()) {
String resourceName = currentState.getResourceName();
- Map<String, String> resourceStateMap = currentState.getPartitionStateStringMap();
+ Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
// don't overwrite ideal state settings
if (!resourceMap.containsKey(resourceName)) {
@@ -99,8 +99,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
if (currentState.getStateModelDefRef() == null) {
LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
- + ", partitions: " + currentState.getPartitionStateStringMap().keySet()
- + ", states: " + currentState.getPartitionStateStringMap().values());
+ + ", partitions: " + currentState.getPartitionStateMap().keySet()
+ + ", states: " + currentState.getPartitionStateMap().values());
throw new StageException("State model def is null for resource:"
+ currentState.getResourceName());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index a50f76b..c942db9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -98,7 +98,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
LiveInstance liveInstance = liveInstanceMap.get(instanceName);
String participantVersion = null;
if (liveInstance != null) {
- participantVersion = liveInstance.getHelixVersion().toString();
+ participantVersion = liveInstance.getTypedHelixVersion().toString();
}
if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
@@ -108,9 +108,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
}
String key =
- keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId().stringify(),
+ keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
message.getResourceId().stringify()).getPath()
- + "/" + message.getFromState() + "/" + message.getToState();
+ + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
if (!batchMessages.containsKey(key)) {
Message batchMessage = new Message(message.getRecord());
@@ -133,9 +133,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
List<PropertyKey> keys = new ArrayList<PropertyKey>();
for (Message message : messages) {
- logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+ logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
+ " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
- + message.getFromState() + " to:" + message.getToState());
+ + message.getTypedFromState() + " to:" + message.getTypedToState());
// System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
// message.getTgtName()
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
index 55ace7a..dd8e9be 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
@@ -149,7 +149,7 @@ public class ControllerManager extends AbstractManager {
LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
if (leader != null) {
String leaderName = leader.getInstanceName();
- String sessionId = leader.getSessionId().stringify();
+ String sessionId = leader.getTypedSessionId().stringify();
if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
&& sessionId.equals(_sessionId)) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
index bd98632..11222e1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
@@ -60,7 +60,7 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
curState = new CurrentState(currentData);
}
- for (PartitionId partitionId : _lastCurState.getPartitionStateMap().keySet()) {
+ for (PartitionId partitionId : _lastCurState.getTypedPartitionStateMap().keySet()) {
// carry-over only when current-state not exist
if (curState.getState(partitionId) == null) {
curState.setState(partitionId, State.from(_initState));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
index 5f6d083..33d271b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -36,7 +36,7 @@ public class DefaultControllerMessageHandlerFactory implements MessageHandlerFac
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
- throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ throw new HelixException("Unexpected msg type for message " + message.getMessageId() + " type:"
+ message.getMsgType());
}
@@ -63,18 +63,18 @@ public class DefaultControllerMessageHandlerFactory implements MessageHandlerFac
String type = _message.getMsgType();
HelixTaskResult result = new HelixTaskResult();
if (!type.equals(MessageType.CONTROLLER_MSG.toString())) {
- throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
+ throw new HelixException("Unexpected msg type for message " + _message.getMessageId()
+ " type:" + _message.getMsgType());
}
result.getTaskResultMap().put("ControllerResult",
- "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
+ "msg " + _message.getMessageId() + " from " + _message.getMsgSrc() + " processed");
result.setSuccess(true);
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMessageId(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
index f297252..5e3a7ea 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -99,7 +99,7 @@ public class DefaultParticipantErrorMessageHandlerFactory implements MessageHand
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMessageId(), e);
}
}
@@ -109,7 +109,7 @@ public class DefaultParticipantErrorMessageHandlerFactory implements MessageHand
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
- throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ throw new HelixException("Unexpected msg type for message " + message.getMessageId() + " type:"
+ message.getMsgType());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 38a067b..e494507 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -78,7 +78,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
@Override
public void onTimeOut() {
- _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId() + " timout with "
+ _logger.info("Scheduler msg timeout " + _originalMessage.getMessageId() + " timout with "
+ _timeout + " Ms");
_statusUpdateUtil.logError(_originalMessage, SchedulerAsyncCallback.class, "Task timeout",
@@ -88,13 +88,13 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
@Override
public void onReplyMessage(Message message) {
- _logger.info("Update for scheduler msg " + _originalMessage.getMsgId() + " Message "
+ _logger.info("Update for scheduler msg " + _originalMessage.getMessageId() + " Message "
+ message.getMsgSrc() + " id " + message.getCorrelationId() + " completed");
String key = "MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID();
_resultSummaryMap.put(key, message.getResultMap());
if (this.isDone()) {
- _logger.info("Scheduler msg " + _originalMessage.getMsgId() + " completed");
+ _logger.info("Scheduler msg " + _originalMessage.getMessageId() + " completed");
_statusUpdateUtil.logInfo(_originalMessage, SchedulerAsyncCallback.class,
"Scheduler task completed", _manager.getHelixDataAccessor());
addSummary(_resultSummaryMap, _originalMessage, _manager, false);
@@ -113,11 +113,11 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), originalMessage
- .getMsgId().stringify())).getRecord();
+ .getMessageId().stringify())).getRecord();
statusUpdate.getMapFields().putAll(_resultSummaryMap);
accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
- originalMessage.getMsgId().stringify()), new StatusUpdate(statusUpdate));
+ originalMessage.getMessageId().stringify()), new StatusUpdate(statusUpdate));
}
}
@@ -134,7 +134,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
- throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ throw new HelixException("Unexpected msg type for message " + message.getMessageId() + " type:"
+ message.getMsgType());
}
@@ -208,7 +208,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
newAddedScheduledTasks.getRecord().setMapField(partitionId,
task.getRecord().getSimpleFields());
_logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task "
- + partitionId + " " + task.getMsgId() + " to " + instanceName);
+ + partitionId + " " + task.getMessageId() + " to " + instanceName);
if (_logger.isDebugEnabled()) {
_logger.debug(task.getRecord().getSimpleFields());
@@ -225,16 +225,16 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), _message
- .getMsgId().stringify())).getRecord();
+ .getMessageId().stringify())).getRecord();
statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
- _message.getMsgId().stringify()), new StatusUpdate(statusUpdate));
+ _message.getMessageId().stringify()), new StatusUpdate(statusUpdate));
}
private int findTopPartitionId(IdealState currentTaskQueue) {
int topId = 0;
- for (PartitionId partitionId : currentTaskQueue.getPartitionSet()) {
+ for (PartitionId partitionId : currentTaskQueue.getPartitionIdSet()) {
try {
String partitionName = partitionId.stringify();
String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
@@ -254,7 +254,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
String type = _message.getMsgType();
HelixTaskResult result = new HelixTaskResult();
if (!type.equals(MessageType.SCHEDULER_MSG.toString())) {
- throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
+ throw new HelixException("Unexpected msg type for message " + _message.getMessageId()
+ " type:" + _message.getMsgType());
}
// Parse timeout value
@@ -301,11 +301,11 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
if (InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType()
&& hasSchedulerTaskQueue) {
handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate,
- _message.getMsgId());
+ _message.getMessageId());
result.setSuccess(true);
- result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId().stringify());
+ result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMessageId().stringify());
result.getTaskResultMap().put("ControllerResult",
- "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
+ "msg " + _message.getMessageId() + " from " + _message.getMsgSrc() + " processed");
return result;
}
@@ -331,23 +331,23 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), _message
- .getMsgId().stringify())).getRecord();
+ .getMessageId().stringify())).getRecord();
statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
- _message.getMsgId().stringify()), new StatusUpdate(statusUpdate));
+ _message.getMessageId().stringify()), new StatusUpdate(statusUpdate));
result.getTaskResultMap().put("ControllerResult",
- "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
- result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId().stringify());
+ "msg " + _message.getMessageId() + " from " + _message.getMsgSrc() + " processed");
+ result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMessageId().stringify());
result.setSuccess(true);
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMessageId(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
index 0a005f2..f169317 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
@@ -175,7 +175,7 @@ public class DistributedControllerManager extends AbstractManager {
LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
if (leader != null) {
String leaderName = leader.getInstanceName();
- String sessionId = leader.getSessionId().stringify();
+ String sessionId = leader.getTypedSessionId().stringify();
if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
&& sessionId.equals(_sessionId)) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 3cf9244..caf4dae 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -138,7 +138,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
leader = accessor.getProperty(keyBuilder.controllerLeader());
if (leader != null) {
- String leaderSessionId = leader.getSessionId().stringify();
+ String leaderSessionId = leader.getTypedSessionId().stringify();
LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+ leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 743fcc3..aa84c4d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -149,14 +149,14 @@ public class ParticipantManagerHelper {
* update sessionId field in live-instance if necessary
*/
LiveInstance curLiveInstance = new LiveInstance(record);
- if (!curLiveInstance.getSessionId().stringify().equals(_sessionId)) {
+ if (!curLiveInstance.getTypedSessionId().stringify().equals(_sessionId)) {
/**
* in last handle-new-session,
* live-instance is created by new zkconnection with stale session-id inside
* just update session-id field
*/
LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
- + ", old-sessionId: " + curLiveInstance.getSessionId() + ", new-sessionId: "
+ + ", old-sessionId: " + curLiveInstance.getTypedSessionId() + ", new-sessionId: "
+ _sessionId);
curLiveInstance.setSessionId(_sessionId);
@@ -228,10 +228,9 @@ public class ParticipantManagerHelper {
String curStatePath =
_keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
.getPath();
- _dataAccessor.getBaseDataAccessor().update(
- curStatePath,
- new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialStateString(),
- lastCurState), AccessOption.PERSISTENT);
+ _dataAccessor.getBaseDataAccessor().update(curStatePath,
+ new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
+ AccessOption.PERSISTENT);
}
}