You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:47 UTC
[39/53] [abbrv] [HELIX-209] Backward compatible function naming in
the model package
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 75d564f..59bae9f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -334,7 +334,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
// check partition is in ERROR state
- SessionId sessionId = liveInstance.getSessionId();
+ SessionId sessionId = liveInstance.getTypedSessionId();
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId.stringify(),
resourceName));
@@ -358,7 +358,7 @@ public class ZKHelixAdmin implements HelixAdmin {
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
for (Message message : messages) {
if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
- || !sessionId.equals(message.getTgtSessionId())
+ || !sessionId.equals(message.getTypedTgtSessionId())
|| !resourceName.equals(message.getResourceId().stringify())
|| !resetPartitionNames.contains(message.getPartitionId().stringify())) {
continue;
@@ -391,7 +391,7 @@ public class ZKHelixAdmin implements HelixAdmin {
message.setTgtSessionId(sessionId);
message.setStateModelDef(stateModelDef);
message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
- message.setToState(stateModel.getInitialState());
+ message.setToState(stateModel.getTypedInitialState());
message.setStateModelFactoryId(idealState.getStateModelFactoryId());
resetMessages.add(message);
@@ -993,7 +993,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
// StateModelDefinition def = new StateModelDefinition(stateModDef);
- List<String> statePriorityList = stateModDef.getStatesPriorityStringList();
+ List<String> statePriorityList = stateModDef.getStatesPriorityList();
String masterStateValue = null;
String slaveStateValue = null;
@@ -1151,7 +1151,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void rebalance(String clusterName, IdealState currentIdealState, List<String> instanceNames) {
Set<String> activeInstances = new HashSet<String>();
- for (PartitionId partition : currentIdealState.getPartitionSet()) {
+ for (PartitionId partition : currentIdealState.getPartitionIdSet()) {
activeInstances.addAll(IdealState.stringListFromPreferenceList(currentIdealState
.getPreferenceList(partition)));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 7d37b68..afd35e6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -837,7 +837,7 @@ public class ZKHelixManager implements HelixManager {
keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
.getPath();
_helixAccessor.getBaseDataAccessor().update(curStatePath,
- new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialStateString(), lastCurState),
+ new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
AccessOption.PERSISTENT);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
index f9743a4..9fea0c8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
@@ -71,7 +71,7 @@ public abstract class AsyncCallback {
}
public synchronized final void onReply(Message message) {
- _logger.info("OnReply msg " + message.getMsgId());
+ _logger.info("OnReply msg " + message.getMessageId());
if (!isDone()) {
_messageReplied.add(message);
try {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index a207b0c..73b69a8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -161,7 +161,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
for (LiveInstance liveInstance : liveInstances) {
- sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId()
+ sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId()
.stringify());
}
}
@@ -194,7 +194,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
List<Message> messages = new ArrayList<Message>();
MessageId id = MessageId.from(UUID.randomUUID().toString());
Message newMessage = new Message(message.getRecord(), id);
- newMessage.setMsgId(id);
+ newMessage.setMessageId(id);
newMessage.setSrcName(_manager.getInstanceName());
newMessage.setTgtName("Controller");
messages.add(newMessage);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
index 46c595d..17fc67d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -47,14 +47,14 @@ public class AsyncCallbackService implements MessageHandlerFactory {
void verifyMessage(Message message) {
if (!message.getMsgType().toString().equalsIgnoreCase(MessageType.TASK_REPLY.toString())) {
String errorMsg =
- "Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType()
+ "Unexpected msg type for message " + message.getMessageId() + " type:" + message.getMsgType()
+ " Expected : " + MessageType.TASK_REPLY;
_logger.error(errorMsg);
throw new HelixException(errorMsg);
}
String correlationId = message.getCorrelationId();
if (correlationId == null) {
- String errorMsg = "Message " + message.getMsgId() + " does not have correlation id";
+ String errorMsg = "Message " + message.getMessageId() + " does not have correlation id";
_logger.error(errorMsg);
throw new HelixException(errorMsg);
}
@@ -62,13 +62,13 @@ public class AsyncCallbackService implements MessageHandlerFactory {
if (!_callbackMap.containsKey(correlationId)) {
String errorMsg =
"Message "
- + message.getMsgId()
+ + message.getMessageId()
+ " does not have correponding callback. Probably timed out already. Correlation id: "
+ correlationId;
_logger.error(errorMsg);
throw new HelixException(errorMsg);
}
- _logger.info("Verified reply message " + message.getMsgId() + " correlation:" + correlationId);
+ _logger.info("Verified reply message " + message.getMessageId() + " correlation:" + correlationId);
}
@Override
@@ -101,7 +101,7 @@ public class AsyncCallbackService implements MessageHandlerFactory {
verifyMessage(_message);
HelixTaskResult result = new HelixTaskResult();
assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
- _logger.info("invoking reply message " + _message.getMsgId() + ", correlationid:"
+ _logger.info("invoking reply message " + _message.getMessageId() + ", correlationid:"
+ _correlationId);
AsyncCallback callback = _callbackMap.get(_correlationId);
@@ -118,7 +118,7 @@ public class AsyncCallbackService implements MessageHandlerFactory {
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMessageId(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index c6eaa65..8381f4a 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -96,7 +96,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PartitionId partitionId = _message.getPartitionId();
- State fromState = _message.getFromState();
+ State fromState = _message.getTypedFromState();
// Verify the fromState and current state of the stateModel
String state = _currentStateDelta.getState(partitionId.stringify());
@@ -122,7 +122,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
PartitionId partitionId = _message.getPartitionId();
ResourceId resource = _message.getResourceId();
- SessionId sessionId = _message.getTgtSessionId();
+ SessionId sessionId = _message.getTypedTgtSessionId();
String instanceName = _manager.getInstanceName();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -135,15 +135,15 @@ public class HelixStateTransitionHandler extends MessageHandler {
// new session
// sessionId might change when we update the state model state.
// for zk current state it is OK as we have the per-session current state node
- if (!_message.getTgtSessionId().stringify().equals(_manager.getSessionId())) {
+ if (!_message.getTypedTgtSessionId().stringify().equals(_manager.getSessionId())) {
logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
- + _message.getExecutionSessionId() + " , new session : " + _manager.getSessionId());
+ + _message.getTypedExecutionSessionId() + " , new session : " + _manager.getSessionId());
return;
}
if (taskResult.isSuccess()) {
// String fromState = message.getFromState();
- State toState = _message.getToState();
+ State toState = _message.getTypedToState();
_currentStateDelta.setState(partitionId, toState);
if (toState.toString().equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
@@ -181,7 +181,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
// state in this case
logger
.error("State transition interrupted but not timeout. Not updating state. Partition : "
- + _message.getPartitionId() + " MsgId : " + _message.getMsgId());
+ + _message.getPartitionId() + " MsgId : " + _message.getMessageId());
return;
}
}
@@ -190,7 +190,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
_stateModel.updateState(HelixDefinedState.ERROR.toString());
// if we have errors transit from ERROR state, disable the partition
- if (_message.getFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+ if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
disablePartition();
}
}
@@ -229,7 +229,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
HelixAdmin admin = _manager.getClusterManagmentTool();
admin.enablePartition(false, clusterName, instanceName, resourceId.stringify(),
Arrays.asList(partitionId.stringify()));
- logger.info("error in transit from ERROR to " + _message.getToState() + " for partition: "
+ logger.info("error in transit from ERROR to " + _message.getTypedToState() + " for partition: "
+ partitionId + ". disable it on " + instanceName);
}
@@ -288,8 +288,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
// by default, we invoke state transition function in state model
Method methodToInvoke = null;
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
methodToInvoke =
_transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
fromState.toString(), toState.toString(), new Class[] {
@@ -335,10 +335,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
_stateModel.updateState(HelixDefinedState.ERROR.toString());
// if transit from ERROR state, disable the partition
- if (_message.getFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+ if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
disablePartition();
}
- accessor.updateProperty(keyBuilder.currentState(instanceName, _message.getTgtSessionId()
+ accessor.updateProperty(keyBuilder.currentState(instanceName, _message.getTypedTgtSessionId()
.stringify(), resourceId.stringify()), currentStateDelta);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 39da1aa..3bcc260 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -90,14 +90,14 @@ public class HelixTask implements MessageTask {
_statusUpdateUtil.logError(_message, HelixTask.class, e,
"State transition interrupted, timeout:" + _isTimeout, accessor);
- logger.info("Message " + _message.getMsgId() + " is interrupted");
+ logger.info("Message " + _message.getMessageId() + " is interrupted");
} catch (Exception e) {
taskResult = new HelixTaskResult();
taskResult.setException(e);
taskResult.setMessage(e.getMessage());
String errorMessage =
- "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+ "Exception while executing a message. " + e + " msgId: " + _message.getMessageId()
+ " type: " + _message.getMsgType();
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
@@ -111,17 +111,17 @@ public class HelixTask implements MessageTask {
if (taskResult.isSuccess()) {
_statusUpdateUtil.logInfo(_message, _handler.getClass(),
"Message handling task completed successfully", accessor);
- logger.info("Message " + _message.getMsgId() + " completed.");
+ logger.info("Message " + _message.getMessageId() + " completed.");
} else {
type = ErrorType.INTERNAL;
if (taskResult.isInterrupted()) {
- logger.info("Message " + _message.getMsgId() + " is interrupted");
+ logger.info("Message " + _message.getMessageId() + " is interrupted");
code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
if (_isTimeout) {
int retryCount = _message.getRetryCount();
logger.info("Message timeout, retry count: " + retryCount + " msgId:"
- + _message.getMsgId());
+ + _message.getMessageId());
_statusUpdateUtil.logInfo(_message, _handler.getClass(),
"Message handling task timeout, retryCount:" + retryCount, accessor);
// Notify the handler that timeout happens, and the number of retries left
@@ -158,12 +158,12 @@ public class HelixTask implements MessageTask {
code = ErrorCode.ERROR;
String errorMessage =
- "Exception after executing a message, msgId: " + _message.getMsgId() + e;
+ "Exception after executing a message, msgId: " + _message.getMessageId() + e;
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
} finally {
long end = System.currentTimeMillis();
- logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
+ logger.info("msg: " + _message.getMessageId() + " handling task completed, results:"
+ taskResult.isSuccess() + ", at: " + end + ", took:" + (end - start));
// Notify the handler about any error happened in the handling procedure, so that
@@ -182,9 +182,9 @@ public class HelixTask implements MessageTask {
Builder keyBuilder = accessor.keyBuilder();
if (message.getTgtName().equalsIgnoreCase("controller")) {
// TODO: removeProperty returns boolean
- accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId().stringify()));
+ accessor.removeProperty(keyBuilder.controllerMessage(message.getMessageId().stringify()));
} else {
- accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMsgId()
+ accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMessageId()
.stringify()));
}
}
@@ -208,11 +208,11 @@ public class HelixTask implements MessageTask {
if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(
- keyBuilder.message(message.getMsgSrc(), replyMessage.getMsgId().stringify()),
+ keyBuilder.message(message.getMsgSrc(), replyMessage.getMessageId().stringify()),
replyMessage);
} else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId().stringify()),
+ accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMessageId().stringify()),
replyMessage);
}
_statusUpdateUtil.logInfo(message, HelixTask.class,
@@ -232,8 +232,8 @@ public class HelixTask implements MessageTask {
long totalDelay = now - msgReadTime;
long executionDelay = now - msgExecutionStartTime;
if (totalDelay > 0 && executionDelay > 0) {
- State fromState = message.getFromState();
- State toState = message.getToState();
+ State fromState = message.getTypedFromState();
+ State toState = message.getTypedToState();
String transition = fromState + "--" + toState;
StateTransitionContext cxt =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8cf1aa7..8da53ea 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -432,14 +432,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
continue;
}
- SessionId tgtSessionId = message.getTgtSessionId();
+ SessionId tgtSessionId = message.getTypedTgtSessionId();
// sessionId mismatch normally means message comes from expired session, just remove it
if (!sessionId.equals(tgtSessionId.toString()) && !tgtSessionId.toString().equals("*")) {
String warningMessage =
"SessionId does NOT match. expected sessionId: " + sessionId
+ ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
- + message.getMsgId();
+ + message.getMessageId();
LOG.warn(warningMessage);
accessor.removeProperty(message.getKey(keyBuilder, instanceName));
_statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage,
@@ -454,7 +454,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// We will read the message again if there is a new message but we
// check for the status and ignore if its already read
if (LOG.isTraceEnabled()) {
- LOG.trace("Message already read. msgId: " + message.getMsgId());
+ LOG.trace("Message already read. msgId: " + message.getMessageId());
}
continue;
}
@@ -467,9 +467,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
}
handlers.add(createHandler);
} catch (Exception e) {
- LOG.error("Failed to create message handler for " + message.getMsgId(), e);
+ LOG.error("Failed to create message handler for " + message.getMessageId(), e);
String error =
- "Failed to create message handler for " + message.getMsgId() + ", exception: " + e;
+ "Failed to create message handler for " + message.getMessageId() + ", exception: " + e;
_statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, accessor);
@@ -547,7 +547,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// the corresponding MessageHandlerFactory is registered
if (handlerFactory == null) {
LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: "
- + message.getMsgId());
+ + message.getMessageId());
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
index e1b4f0f..17fc041 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
@@ -40,7 +40,7 @@ public class MessageTimeoutTask extends TimerTask {
Message message = _task.getMessage();
// NotificationContext context = _task.getNotificationContext();
// System.out.println("msg: " + message.getMsgId() + " timeouot.");
- LOG.warn("Message time out, canceling. id:" + message.getMsgId() + " timeout : "
+ LOG.warn("Message time out, canceling. id:" + message.getMessageId() + " timeout : "
+ message.getExecutionTimeout());
_task.onTimeout();
_executor.cancelTask(_task);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
index d90ec1a..d5ee44c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
+++ b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.SessionId;
import org.apache.helix.model.Alerts.AlertsProperty;
/**
@@ -67,6 +68,16 @@ public class AlertStatus extends HelixProperty {
}
/**
+ * Set the session that the alerts correspond to
+ * @param sessionId the session for which alerts occurred
+ */
+ public void setSessionId(SessionId sessionId) {
+ if (sessionId != null) {
+ setSessionId(sessionId.stringify());
+ }
+ }
+
+ /**
* Get the session that these alerts correspond to
* @return session identifier
*/
@@ -75,6 +86,14 @@ public class AlertStatus extends HelixProperty {
}
/**
+ * Get the session that the alerts correspond to
+ * @return session identifier
+ */
+ public SessionId getTypedSessionId() {
+ return SessionId.from(getSessionId());
+ }
+
+ /**
* Get the instance that these alerts correspond to
* @return name of the instance
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/Alerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Alerts.java b/helix-core/src/main/java/org/apache/helix/model/Alerts.java
index 506e3d5..f30f39c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Alerts.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Alerts.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.SessionId;
/**
* Describe alerts and corresponding metrics. An alert is triggered when cluster health
@@ -78,6 +79,16 @@ public class Alerts extends HelixProperty {
}
/**
+ * Set the session that the alerts correspond to
+ * @param sessionId the session for which alerts occurred
+ */
+ public void setSessionId(SessionId sessionId) {
+ if (sessionId != null) {
+ setSessionId(sessionId.stringify());
+ }
+ }
+
+ /**
* Get the session that the alerts correspond to
* @return session identifier
*/
@@ -86,6 +97,14 @@ public class Alerts extends HelixProperty {
}
/**
+ * Get the session that the alerts correspond to
+ * @return session identifier
+ */
+ public SessionId getTypedSessionId() {
+ return SessionId.from(getSessionId());
+ }
+
+ /**
* Get the instance that the alerts correspond to
* @return the name of the instance
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index 8e37b18..daefe6e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -126,11 +126,20 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * add the constraint, overwrite existing one if constraint with same constraint-id already exists
+ * @param constraintId unique constraint identifier
+ * @param item the constraint as a {@link ConstraintItem}
+ */
+ public void addConstraintItem(String constraintId, ConstraintItem item) {
+ addConstraintItem(ConstraintId.from(constraintId), item);
+ }
+
+ /**
* Add multiple constraint items.
* @param items (constraint identifier, {@link ConstrantItem}) pairs
*/
- public void addConstraintItems(Map<ConstraintId, ConstraintItem> items) {
- for (ConstraintId constraintId : items.keySet()) {
+ public void addConstraintItems(Map<String, ConstraintItem> items) {
+ for (String constraintId : items.keySet()) {
addConstraintItem(constraintId, items.get(constraintId));
}
}
@@ -145,6 +154,14 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * remove a constraint-item
+ * @param constraintId unique constraint identifier
+ */
+ public void removeConstraintItem(String constraintId) {
+ removeConstraintItem(ConstraintId.from(constraintId));
+ }
+
+ /**
* get a constraint-item
* @param constraintId unique constraint identifier
* @return {@link ConstraintItem} or null if not present
@@ -154,6 +171,15 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * get a constraint-item
+ * @param constraintId unique constraint identifier
+ * @return {@link ConstraintItem} or null if not present
+ */
+ public ConstraintItem getConstraintItem(String constraintId) {
+ return getConstraintItem(ConstraintId.from(constraintId));
+ }
+
+ /**
* return a set of constraints that match the attribute pairs
* @param attributes (constraint scope, constraint string) pairs
* @return a set of {@link ConstraintItem}s with matching attributes
@@ -186,9 +212,9 @@ public class ClusterConstraints extends HelixProperty {
String msgType = msg.getMsgType();
attributes.put(ConstraintAttribute.MESSAGE_TYPE, msgType);
if (MessageType.STATE_TRANSITION.toString().equals(msgType)) {
- if (msg.getFromState() != null && msg.getToState() != null) {
+ if (msg.getTypedFromState() != null && msg.getTypedToState() != null) {
attributes.put(ConstraintAttribute.TRANSITION,
- Transition.from(msg.getFromState(), msg.getToState()).toString());
+ Transition.from(msg.getTypedFromState(), msg.getTypedToState()).toString());
}
if (msg.getResourceId() != null) {
attributes.put(ConstraintAttribute.RESOURCE, msg.getResourceId().stringify());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index 2fe37ce..5c9bcbc 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -95,7 +95,7 @@ public class CurrentState extends HelixProperty {
* Get the partitions on this instance and the state that each partition is currently in.
* @return (partition, state) pairs
*/
- public Map<String, String> getPartitionStateStringMap() {
+ public Map<String, String> getPartitionStateMap() {
Map<String, String> map = new HashMap<String, String>();
Map<String, Map<String, String>> mapFields = _record.getMapFields();
for (String partitionName : mapFields.keySet()) {
@@ -111,7 +111,7 @@ public class CurrentState extends HelixProperty {
* Get the partitions on this instance and the state that each partition is currently in
* @return (partition id, state) pairs
*/
- public Map<PartitionId, State> getPartitionStateMap() {
+ public Map<PartitionId, State> getTypedPartitionStateMap() {
Map<PartitionId, State> map = new HashMap<PartitionId, State>();
for (String partitionName : _record.getMapFields().keySet()) {
Map<String, String> stateMap = _record.getMapField(partitionName);
@@ -127,8 +127,16 @@ public class CurrentState extends HelixProperty {
* Get the session that this current state corresponds to
* @return session identifier
*/
- public SessionId getSessionId() {
- return SessionId.from(_record.getSimpleField(CurrentStateProperty.SESSION_ID.toString()));
+ public SessionId getTypedSessionId() {
+ return SessionId.from(getSessionId());
+ }
+
+ /**
+ * Get the session that this current state corresponds to
+ * @return session identifier
+ */
+ public String getSessionId() {
+ return _record.getSimpleField(CurrentStateProperty.SESSION_ID.toString());
}
/**
@@ -136,7 +144,15 @@ public class CurrentState extends HelixProperty {
* @param sessionId session identifier
*/
public void setSessionId(SessionId sessionId) {
- _record.setSimpleField(CurrentStateProperty.SESSION_ID.toString(), sessionId.stringify());
+ setSessionId(sessionId.stringify());
+ }
+
+ /**
+ * Set the session that this current state corresponds to
+ * @param sessionId session identifier
+ */
+ public void setSessionId(String sessionId) {
+ _record.setSimpleField(CurrentStateProperty.SESSION_ID.toString(), sessionId);
}
/**
@@ -197,7 +213,7 @@ public class CurrentState extends HelixProperty {
/**
* Set the state that a partition is currently in on this instance
- * @param partitionName the name of the partition
+ * @param partitionId the id of the partition
* @param state the state of the partition
*/
public void setState(PartitionId partitionId, State state) {
@@ -210,6 +226,19 @@ public class CurrentState extends HelixProperty {
}
/**
+ * Set the state that a partition is currently in on this instance
+ * @param partitionName the name of the partition
+ * @param state the state of the partition
+ */
+ public void setState(String partitionName, String state) {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ if (mapFields.get(partitionName) == null) {
+ mapFields.put(partitionName, new TreeMap<String, String>());
+ }
+ mapFields.get(partitionName).put(CurrentStateProperty.CURRENT_STATE.toString(), state);
+ }
+
+ /**
* Set the state model factory
* @param factoryName the name of the factory
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index 15a22ca..0f7b6db 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -114,7 +114,7 @@ public class ExternalView extends HelixProperty {
* Get all the partitions of the resource
* @return a set of partition names
*/
- public Set<String> getPartitionStringSet() {
+ public Set<String> getPartitionSet() {
return _record.getMapFields().keySet();
}
@@ -122,9 +122,9 @@ public class ExternalView extends HelixProperty {
* Get all the partitions of the resource
* @return a set of partition ids
*/
- public Set<PartitionId> getPartitionSet() {
+ public Set<PartitionId> getPartitionIdSet() {
Set<PartitionId> partitionSet = Sets.newHashSet();
- for (String partitionName : getPartitionStringSet()) {
+ for (String partitionName : getPartitionSet()) {
partitionSet.add(PartitionId.from(partitionName));
}
return partitionSet;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 8f579ec..7d84258 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -126,7 +126,6 @@ public class IdealState extends HelixProperty {
* Get the associated resource
* @return the name of the resource
*/
- @Deprecated
public String getResourceName() {
return _record.getId();
}
@@ -173,7 +172,6 @@ public class IdealState extends HelixProperty {
* Define a custom rebalancer that implements {@link Rebalancer}
* @param rebalancerClassName the name of the custom rebalancing class
*/
- @Deprecated
public void setRebalancerClassName(String rebalancerClassName) {
_record
.setSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString(), rebalancerClassName);
@@ -183,7 +181,6 @@ public class IdealState extends HelixProperty {
* Get the name of the user-defined rebalancer associated with this resource
* @return the rebalancer class name, or null if none is being used
*/
- @Deprecated
public String getRebalancerClassName() {
return _record.getSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString());
}
@@ -277,8 +274,7 @@ public class IdealState extends HelixProperty {
* Get all of the partitions
* @return a set of partition names
*/
- @Deprecated
- public Set<String> getPartitionStringSet() {
+ public Set<String> getPartitionSet() {
if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
|| getRebalanceMode() == RebalanceMode.FULL_AUTO) {
return _record.getListFields().keySet();
@@ -295,9 +291,9 @@ public class IdealState extends HelixProperty {
* Get all of the partitions
* @return a set of partitions
*/
- public Set<PartitionId> getPartitionSet() {
+ public Set<PartitionId> getPartitionIdSet() {
Set<PartitionId> partitionSet = Sets.newHashSet();
- for (String partitionName : getPartitionStringSet()) {
+ for (String partitionName : getPartitionSet()) {
partitionSet.add(PartitionId.from(partitionName));
}
return partitionSet;
@@ -308,7 +304,6 @@ public class IdealState extends HelixProperty {
* @param partitionName the name of the partition
* @return the instances where the replicas live and the state of each
*/
- @Deprecated
public Map<String, String> getInstanceStateMap(String partitionName) {
return _record.getMapField(partitionName);
}
@@ -350,7 +345,6 @@ public class IdealState extends HelixProperty {
* @param partitionName the partition to look up
* @return set of instance names
*/
- @Deprecated
public Set<String> getInstanceSet(String partitionName) {
if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
|| getRebalanceMode() == RebalanceMode.FULL_AUTO) {
@@ -407,7 +401,6 @@ public class IdealState extends HelixProperty {
* @param partitionName the name of the partition
* @return a list of instances that can serve replicas of the partition
*/
- @Deprecated
public List<String> getPreferenceList(String partitionName) {
List<String> instanceStateList = _record.getListField(partitionName);
@@ -439,7 +432,6 @@ public class IdealState extends HelixProperty {
* Get the state model associated with this resource
* @return an identifier of the state model
*/
- @Deprecated
public String getStateModelDefRef() {
return _record.getSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString());
}
@@ -456,7 +448,6 @@ public class IdealState extends HelixProperty {
* Set the state model associated with this resource
* @param stateModel state model identifier
*/
- @Deprecated
public void setStateModelDefRef(String stateModel) {
_record.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), stateModel);
}
@@ -546,7 +537,6 @@ public class IdealState extends HelixProperty {
* Set the state model factory associated with this resource
* @param name state model factory name
*/
- @Deprecated
public void setStateModelFactoryName(String name) {
_record.setSimpleField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(), name);
}
@@ -565,7 +555,6 @@ public class IdealState extends HelixProperty {
* Get the state model factory associated with this resource
* @return state model factory name
*/
- @Deprecated
public String getStateModelFactoryName() {
return _record.getStringField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(),
HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
@@ -609,7 +598,7 @@ public class IdealState extends HelixProperty {
if (!replicaStr.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
int replica = Integer.parseInt(replicaStr);
- Set<String> partitionSet = getPartitionStringSet();
+ Set<String> partitionSet = getPartitionSet();
for (String partition : partitionSet) {
List<String> preferenceList = getPreferenceList(partition);
if (preferenceList == null || preferenceList.size() != replica) {
@@ -655,7 +644,7 @@ public class IdealState extends HelixProperty {
_record.getListFields().clear();
// assign a partition at a time
- for (PartitionId partition : assignment.getMappedPartitions()) {
+ for (PartitionId partition : assignment.getMappedPartitionIds()) {
List<ParticipantId> preferenceList = new ArrayList<ParticipantId>();
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
@@ -665,7 +654,7 @@ public class IdealState extends HelixProperty {
Multimaps.invertFrom(Multimaps.forMap(replicaMap), inverseMap);
// update the ideal state in order of state priorities
- for (State state : stateModelDef.getStatesPriorityList()) {
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
if (!state.equals(State.from(HelixDefinedState.DROPPED))
&& !state.equals(State.from(HelixDefinedState.ERROR))) {
List<ParticipantId> stateParticipants = inverseMap.get(state);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index d2e1187..8f776a0 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -228,7 +228,7 @@ public class InstanceConfig extends HelixProperty {
* @param partitionId the partition to set
* @param enabled true to enable, false to disable
*/
- public void setInstanceEnabledForPartition(PartitionId partitionId, boolean enabled) {
+ public void setParticipantEnabledForPartition(PartitionId partitionId, boolean enabled) {
setInstanceEnabledForPartition(partitionId.stringify(), enabled);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index fda144a..e9348ec 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -79,8 +79,16 @@ public class LiveInstance extends HelixProperty {
* Get the session that this participant corresponds to
* @return session identifier
*/
- public SessionId getSessionId() {
- return SessionId.from(_record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()));
+ public SessionId getTypedSessionId() {
+ return SessionId.from(getSessionId());
+ }
+
+ /**
+ * Get the session that this participant corresponds to
+ * @return session identifier
+ */
+ public String getSessionId() {
+ return _record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
}
/**
@@ -103,8 +111,16 @@ public class LiveInstance extends HelixProperty {
* Get the version of Helix that this participant is running
* @return the version
*/
- public HelixVersion getHelixVersion() {
- return HelixVersion.from(_record.getSimpleField(LiveInstanceProperty.HELIX_VERSION.toString()));
+ public HelixVersion getTypedHelixVersion() {
+ return HelixVersion.from(getHelixVersion());
+ }
+
+ /**
+ * Get the version of Helix that this participant is running
+ * @return the version
+ */
+ public String getHelixVersion() {
+ return _record.getSimpleField(LiveInstanceProperty.HELIX_VERSION.toString());
}
/**
@@ -165,11 +181,11 @@ public class LiveInstance extends HelixProperty {
@Override
public boolean isValid() {
- if (getSessionId() == null) {
+ if (getTypedSessionId() == null) {
_logger.error("liveInstance does not have session id. id:" + _record.getId());
return false;
}
- if (getHelixVersion() == null) {
+ if (getTypedHelixVersion() == null) {
_logger.error("liveInstance does not have CLM verion. id:" + _record.getId());
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 86319e3..2bd313a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -129,25 +129,52 @@ public class Message extends HelixProperty {
/**
* Instantiate a message
+ * @param type the message category
+ * @param msgId unique message identifier
+ */
+ public Message(MessageType type, String msgId) {
+ this(type, MessageId.from(msgId));
+ }
+
+ /**
+ * Instantiate a message
* @param type {@link MessageType} as a string or a custom message type
* @param msgId unique message identifier
*/
public Message(String type, MessageId msgId) {
super(new ZNRecord(msgId.stringify()));
_record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
- setMsgId(msgId);
+ setMessageId(msgId);
setMsgState(MessageState.NEW);
_record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
}
/**
+ * Instantiate a message
+ * @param type {@link MessageType} as a string or a custom message type
+ * @param msgId unique message identifier
+ */
+ public Message(String type, String msgId) {
+ this(type, MessageId.from(msgId));
+ }
+
+ /**
* Instantiate a message with a new id
* @param record a ZNRecord corresponding to a message
* @param id unique message identifier
*/
public Message(ZNRecord record, MessageId id) {
super(new ZNRecord(record, id.stringify()));
- setMsgId(id);
+ setMessageId(id);
+ }
+
+ /**
+ * Instantiate a message with a new id
+ * @param record a ZNRecord corresponding to a message
+ * @param id unique message identifier
+ */
+ public Message(ZNRecord record, String id) {
+ this(record, MessageId.from(id));
}
/**
@@ -224,8 +251,16 @@ public class Message extends HelixProperty {
* Get the session identifier of the destination node
* @return session identifier
*/
- public SessionId getTgtSessionId() {
- return SessionId.from(_record.getSimpleField(Attributes.TGT_SESSION_ID.toString()));
+ public SessionId getTypedTgtSessionId() {
+ return SessionId.from(getTgtSessionId());
+ }
+
+ /**
+ * Get the session identifier of the destination node
+ * @return session identifier
+ */
+ public String getTgtSessionId() {
+ return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString());
}
/**
@@ -233,15 +268,33 @@ public class Message extends HelixProperty {
* @param tgtSessionId session identifier
*/
public void setTgtSessionId(SessionId tgtSessionId) {
- _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId.stringify());
+ if (tgtSessionId != null) {
+ setTgtSessionId(tgtSessionId.stringify());
+ }
+ }
+
+ /**
+ * Set the session identifier of the destination node
+ * @param tgtSessionId session identifier
+ */
+ public void setTgtSessionId(String tgtSessionId) {
+ _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId);
}
/**
* Get the session identifier of the source node
* @return session identifier
*/
- public SessionId getSrcSessionId() {
- return SessionId.from(_record.getSimpleField(Attributes.SRC_SESSION_ID.toString()));
+ public SessionId getTypedSrcSessionId() {
+ return SessionId.from(getSrcSessionId());
+ }
+
+ /**
+ * Get the session identifier of the source node
+ * @return session identifier
+ */
+ public String getSrcSessionId() {
+ return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString());
}
/**
@@ -249,15 +302,33 @@ public class Message extends HelixProperty {
* @param srcSessionId session identifier
*/
public void setSrcSessionId(SessionId srcSessionId) {
- _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId.stringify());
+ if (srcSessionId != null) {
+ setSrcSessionId(srcSessionId.stringify());
+ }
+ }
+
+ /**
+ * Set the session identifier of the source node
+ * @param srcSessionId session identifier
+ */
+ public void setSrcSessionId(String srcSessionId) {
+ _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
+ }
+
+ /**
+ * Get the session identifier of the node that executes the message
+ * @return session identifier
+ */
+ public SessionId getTypedExecutionSessionId() {
+ return SessionId.from(getExecutionSessionId());
}
/**
* Get the session identifier of the node that executes the message
* @return session identifier
*/
- public SessionId getExecutionSessionId() {
- return SessionId.from(_record.getSimpleField(Attributes.EXE_SESSION_ID.toString()));
+ public String getExecutionSessionId() {
+ return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString());
}
/**
@@ -265,7 +336,17 @@ public class Message extends HelixProperty {
* @param exeSessionId session identifier
*/
public void setExecuteSessionId(SessionId exeSessionId) {
- _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId.stringify());
+ if (exeSessionId != null) {
+ setExecuteSessionId(exeSessionId.stringify());
+ }
+ }
+
+ /**
+ * Set the session identifier of the node that executes the message
+ * @param exeSessionId session identifier
+ */
+ public void setExecuteSessionId(String exeSessionId) {
+ _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId);
}
/**
@@ -333,23 +414,51 @@ public class Message extends HelixProperty {
* @param partitionId
*/
public void setPartitionId(PartitionId partitionId) {
- _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionId.stringify());
+ if (partitionId != null) {
+ setPartitionName(partitionId.stringify());
+ }
+ }
+
+ /**
+ * Set the id of the partition this message concerns
+ * @param partitionId
+ */
+ public void setPartitionName(String partitionName) {
+ _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName);
}
/**
* Get the unique identifier of this message
* @return message identifier
*/
- public MessageId getMsgId() {
- return MessageId.from(_record.getSimpleField(Attributes.MSG_ID.toString()));
+ public MessageId getMessageId() {
+ return MessageId.from(getMsgId());
+ }
+
+ /**
+ * Get the unique identifier of this message
+ * @return message identifier
+ */
+ public String getMsgId() {
+ return _record.getSimpleField(Attributes.MSG_ID.toString());
}
/**
* Set the unique identifier of this message
* @param msgId message identifier
*/
- public void setMsgId(MessageId msgId) {
- _record.setSimpleField(Attributes.MSG_ID.toString(), msgId.toString());
+ public void setMessageId(MessageId msgId) {
+ if (msgId != null) {
+ setMsgId(msgId.stringify());
+ }
+ }
+
+ /**
+ * Set the unique identifier of this message
+ * @param msgId message identifier
+ */
+ public void setMsgId(String msgId) {
+ _record.setSimpleField(Attributes.MSG_ID.toString(), msgId);
}
/**
@@ -357,6 +466,16 @@ public class Message extends HelixProperty {
* @param state the state
*/
public void setFromState(State state) {
+ if (state != null) {
+ setFromState(state.toString());
+ }
+ }
+
+ /**
+ * Set the "from state" for transition-related messages
+ * @param state the state
+ */
+ public void setFromState(String state) {
_record.setSimpleField(Attributes.FROM_STATE.toString(), state.toString());
}
@@ -364,8 +483,16 @@ public class Message extends HelixProperty {
* Get the "from-state" for transition-related messages
* @return state, or null for other message types
*/
- public State getFromState() {
- return State.from(_record.getSimpleField(Attributes.FROM_STATE.toString()));
+ public State getTypedFromState() {
+ return State.from(getFromState());
+ }
+
+ /**
+ * Get the "from-state" for transition-related messages
+ * @return state, or null for other message types
+ */
+ public String getFromState() {
+ return _record.getSimpleField(Attributes.FROM_STATE.toString());
}
/**
@@ -373,6 +500,16 @@ public class Message extends HelixProperty {
* @param state the state
*/
public void setToState(State state) {
+ if (state != null) {
+ setToState(state.toString());
+ }
+ }
+
+ /**
+ * Set the "to state" for transition-related messages
+ * @param state the state
+ */
+ public void setToState(String state) {
_record.setSimpleField(Attributes.TO_STATE.toString(), state.toString());
}
@@ -380,8 +517,16 @@ public class Message extends HelixProperty {
* Get the "to state" for transition-related messages
* @return state, or null for other message types
*/
- public State getToState() {
- return State.from(_record.getSimpleField(Attributes.TO_STATE.toString()));
+ public State getTypedToState() {
+ return State.from(getToState());
+ }
+
+ /**
+ * Get the "to state" for transition-related messages
+ * @return state, or null for other message types
+ */
+ public String getToState() {
+ return _record.getSimpleField(Attributes.TO_STATE.toString());
}
/**
@@ -413,7 +558,17 @@ public class Message extends HelixProperty {
* @param resourceId resource name to set
*/
public void setResourceId(ResourceId resourceId) {
- _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceId.stringify());
+ if (resourceId != null) {
+ setResourceName(resourceId.stringify());
+ }
+ }
+
+ /**
+ * Set the resource associated with this message
+ * @param resourceName resource name to set
+ */
+ public void setResourceName(String resourceName) {
+ _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
}
/**
@@ -421,7 +576,15 @@ public class Message extends HelixProperty {
* @return resource name
*/
public ResourceId getResourceId() {
- return ResourceId.from(_record.getSimpleField(Attributes.RESOURCE_NAME.toString()));
+ return ResourceId.from(getResourceName());
+ }
+
+ /**
+ * Get the resource associated with this message
+ * @return resource name
+ */
+ public String getResourceName() {
+ return _record.getSimpleField(Attributes.RESOURCE_NAME.toString());
}
/**
@@ -429,7 +592,15 @@ public class Message extends HelixProperty {
* @return partition id
*/
public PartitionId getPartitionId() {
- return PartitionId.from(_record.getSimpleField(Attributes.PARTITION_NAME.toString()));
+ return PartitionId.from(getPartitionName());
+ }
+
+ /**
+ * Get the resource partition associated with this message
+ * @return partition id
+ */
+ public String getPartitionName() {
+ return _record.getSimpleField(Attributes.PARTITION_NAME.toString());
}
/**
@@ -453,7 +624,17 @@ public class Message extends HelixProperty {
* @param stateModelDefName a reference to the state model definition, e.g. "MasterSlave"
*/
public void setStateModelDef(StateModelDefId stateModelDefId) {
- _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefId.stringify());
+ if (stateModelDefId != null) {
+ setStateModelDef(stateModelDefId.stringify());
+ }
+ }
+
+ /**
+ * Set the state model definition
+ * @param stateModelDefName a reference to the state model definition, e.g. "MasterSlave"
+ */
+ public void setStateModelDef(String stateModelDefId) {
+ _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefId);
}
/**
@@ -629,7 +810,7 @@ public class Message extends HelixProperty {
public static Message createReplyMessage(Message srcMessage, String instanceName,
Map<String, String> taskResultMap) {
if (srcMessage.getCorrelationId() == null) {
- throw new HelixException("Message " + srcMessage.getMsgId()
+ throw new HelixException("Message " + srcMessage.getMessageId()
+ " does not contain correlation id");
}
Message replyMessage =
@@ -679,6 +860,14 @@ public class Message extends HelixProperty {
}
/**
+ * Get a list of partitions associated with this message
+ * @return list of partition ids
+ */
+ public List<String> getPartitionNames() {
+ return _record.getListField(Attributes.PARTITION_NAME.toString());
+ }
+
+ /**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
*/
@@ -762,9 +951,9 @@ public class Message extends HelixProperty {
boolean isNotValid =
isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionId().stringify())
|| isNullOrEmpty(getResourceId().stringify()) || isNullOrEmpty(getStateModelDef())
- || isNullOrEmpty(getToState().toString())
+ || isNullOrEmpty(getTypedToState().toString())
|| isNullOrEmpty(getStateModelFactoryName())
- || isNullOrEmpty(getFromState().toString());
+ || isNullOrEmpty(getTypedFromState().toString());
return !isNotValid;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index c91a655..96d0ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -33,6 +33,8 @@ import org.apache.helix.api.id.ResourceId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* Represents the assignments of replicas for an entire resource, keyed on partitions of the
@@ -75,7 +77,7 @@ public class ResourceAssignment extends HelixProperty {
* Get the currently mapped partitions
* @return list of Partition objects (immutable)
*/
- public List<? extends PartitionId> getMappedPartitions() {
+ public List<? extends PartitionId> getMappedPartitionIds() {
ImmutableList.Builder<PartitionId> builder = new ImmutableList.Builder<PartitionId>();
for (String partitionName : _record.getMapFields().keySet()) {
builder.add(PartitionId.from(partitionName));
@@ -84,6 +86,14 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Get the currently mapped partitions
+ * @return list of Partition objects (immutable)
+ */
+ public List<String> getMappedPartitions() {
+ return Lists.newArrayList(_record.getMapFields().keySet());
+ }
+
+ /**
* Get the entire map of a resource
* @return map of partition to participant to state
*/
@@ -111,13 +121,25 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Get the participant, state pairs for a partition
+ * @param partition the Partition to look up
+ * @return map of (participant id, state)
+ */
+ public Map<String, String> getReplicaMap(String partitionId) {
+ Map<String, String> rawReplicaMap = _record.getMapField(partitionId);
+ if (rawReplicaMap == null) {
+ return Collections.emptyMap();
+ }
+ return rawReplicaMap;
+ }
+
+ /**
* Add participant, state pairs for a partition
- * TODO: should be package-private, but builder can't see it
* @param partitionId the partition to set
* @param replicaMap map of (participant name, state)
*/
public void addReplicaMap(PartitionId partitionId, Map<ParticipantId, State> replicaMap) {
- Map<String, String> convertedMap = new HashMap<String, String>();
+ Map<String, String> convertedMap = Maps.newHashMap();
for (ParticipantId participantId : replicaMap.keySet()) {
convertedMap.put(participantId.stringify(), replicaMap.get(participantId).toString());
}
@@ -125,6 +147,15 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Add participant, state pairs for a partition
+ * @param partitionId the partition to set
+ * @param replicaMap map of (participant name, state)
+ */
+ public void addReplicaMap(String partitionId, Map<String, String> replicaMap) {
+ _record.setMapField(partitionId, replicaMap);
+ }
+
+ /**
* Helper for converting a map of strings to a concrete replica map
* @param rawMap map of participant name to state name
* @return map of participant id to state
@@ -133,7 +164,7 @@ public class ResourceAssignment extends HelixProperty {
if (rawMap == null) {
return Collections.emptyMap();
}
- Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
+ Map<ParticipantId, State> replicaMap = Maps.newHashMap();
for (String participantName : rawMap.keySet()) {
replicaMap.put(ParticipantId.from(participantName), State.from(rawMap.get(participantName)));
}
@@ -150,8 +181,7 @@ public class ResourceAssignment extends HelixProperty {
if (rawMaps == null) {
return Collections.emptyMap();
}
- Map<PartitionId, Map<ParticipantId, State>> participantStateMaps =
- new HashMap<PartitionId, Map<ParticipantId, State>>();
+ Map<PartitionId, Map<ParticipantId, State>> participantStateMaps = Maps.newHashMap();
for (String partitionId : rawMaps.keySet()) {
participantStateMaps.put(PartitionId.from(partitionId),
replicaMapFromStringMap(rawMaps.get(partitionId)));
@@ -185,7 +215,7 @@ public class ResourceAssignment extends HelixProperty {
if (replicaMaps == null) {
return Collections.emptyMap();
}
- Map<String, Map<String, String>> rawMaps = new HashMap<String, Map<String, String>>();
+ Map<String, Map<String, String>> rawMaps = Maps.newHashMap();
for (PartitionId partitionId : replicaMaps.keySet()) {
rawMaps.put(partitionId.stringify(), stringMapFromReplicaMap(replicaMaps.get(partitionId)));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index 3a8542b..a9a6e49 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -175,7 +175,7 @@ public class StateModelDefinition extends HelixProperty {
* Get an ordered priority list of states
* @return state names, the first of which is highest priority
*/
- public List<String> getStatesPriorityStringList() {
+ public List<String> getStatesPriorityList() {
return _statesPriorityList;
}
@@ -183,9 +183,9 @@ public class StateModelDefinition extends HelixProperty {
* Get an ordered priority list of states
* @return immutable list of states, the first of which is highest priority (immutable)
*/
- public List<State> getStatesPriorityList() {
+ public List<State> getTypedStatesPriorityList() {
ImmutableList.Builder<State> builder = new ImmutableList.Builder<State>();
- for (String state : getStatesPriorityStringList()) {
+ for (String state : getStatesPriorityList()) {
builder.add(State.from(state));
}
return builder.build();
@@ -223,7 +223,7 @@ public class StateModelDefinition extends HelixProperty {
* Get the starting state in the model
* @return name of the initial state
*/
- public String getInitialStateString() {
+ public String getInitialState() {
// return _record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE
// .toString());
return _initialState;
@@ -233,7 +233,7 @@ public class StateModelDefinition extends HelixProperty {
* Get the starting state in the model
* @return name of the initial state
*/
- public State getInitialState() {
+ public State getTypedInitialState() {
// return _record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE
// .toString());
return State.from(_initialState);
@@ -259,7 +259,7 @@ public class StateModelDefinition extends HelixProperty {
@Override
public boolean isValid() {
- if (getInitialStateString() == null) {
+ if (getInitialState() == null) {
_logger.error("State model does not contain init state, statemodel:" + _record.getId());
return false;
}
@@ -299,7 +299,16 @@ public class StateModelDefinition extends HelixProperty {
* @param state
*/
public Builder initialState(State initialState) {
- this.initialState = initialState.toString();
+ return initialState(initialState.toString());
+ }
+
+ /**
+ * initial state of a replica when it starts, most commonly used initial
+ * state is OFFLINE
+ * @param state
+ */
+ public Builder initialState(String initialState) {
+ this.initialState = initialState;
return this;
}
@@ -312,7 +321,19 @@ public class StateModelDefinition extends HelixProperty {
* @param states
*/
public Builder addState(State state, int priority) {
- statesMap.put(state.toString(), priority);
+ return addState(state.toString(), priority);
+ }
+
+ /**
+ * Define all valid states using this method. Set the priority in which the
+ * constraints must be satisfied. Lets say STATE1 has a constraint of 1 and
+ * STATE2 has a constraint of 3 but only one node is up then Helix will uses
+ * the priority to see STATE constraint has to be given higher preference <br/>
+ * Use -1 to indicates states with no constraints, like OFFLINE
+ * @param states
+ */
+ public Builder addState(String state, int priority) {
+ statesMap.put(state, priority);
return this;
}
@@ -326,6 +347,15 @@ public class StateModelDefinition extends HelixProperty {
}
/**
+ * Sets the priority to Integer.MAX_VALUE
+ * @param state
+ */
+ public Builder addState(String state) {
+ addState(state, Integer.MAX_VALUE);
+ return this;
+ }
+
+ /**
* Define all legal transitions between states using this method. Priority
* is used to order the transitions. Helix tries to maximize the number of
* transitions that can be fired in parallel without violating the
@@ -343,6 +373,23 @@ public class StateModelDefinition extends HelixProperty {
}
/**
+ * Define all legal transitions between states using this method. Priority
+ * is used to order the transitions. Helix tries to maximize the number of
+ * transitions that can be fired in parallel without violating the
+ * constraint. The transitions are first sorted based on priority and
+ * transitions are selected in a greedy way until the constriants are not
+ * violated.
+ * @param fromState source
+ * @param toState destination
+ * @param priority priority, higher value is higher priority
+ * @return Builder
+ */
+ public Builder addTransition(String fromState, String toState, int priority) {
+ transitionMap.put(new Transition(fromState, toState), priority);
+ return this;
+ }
+
+ /**
* Add a state transition with maximal priority value
* @see #addTransition(String, String, int)
* @param fromState
@@ -355,13 +402,35 @@ public class StateModelDefinition extends HelixProperty {
}
/**
+ * Add a state transition with maximal priority value
+ * @see #addTransition(String, String, int)
+ * @param fromState
+ * @param toState
+ * @return Builder
+ */
+ public Builder addTransition(String fromState, String toState) {
+ addTransition(fromState, toState, Integer.MAX_VALUE);
+ return this;
+ }
+
+ /**
* Set a maximum for replicas in this state
* @param state state name
* @param upperBound maximum
* @return Builder
*/
public Builder upperBound(State state, int upperBound) {
- stateConstraintMap.put(state.toString(), String.valueOf(upperBound));
+ return upperBound(state.toString(), upperBound);
+ }
+
+ /**
+ * Set a maximum for replicas in this state
+ * @param state state name
+ * @param upperBound maximum
+ * @return Builder
+ */
+ public Builder upperBound(String state, int upperBound) {
+ stateConstraintMap.put(state, String.valueOf(upperBound));
return this;
}
@@ -380,7 +449,25 @@ public class StateModelDefinition extends HelixProperty {
* @return Builder
*/
public Builder dynamicUpperBound(State state, String bound) {
- stateConstraintMap.put(state.toString(), bound);
+ return dynamicUpperBound(state.toString(), bound);
+ }
+
+ /**
+ * You can use this to have the bounds dynamically change based on other
+ * parameters. <br/>
+ * Currently support 2 values <br/>
+ * R --> Refers to the number of replicas specified during resource
+ * creation. This allows having different replication factor for each
+ * resource without having to create a different state machine. <br/>
+ * N --> Refers to all nodes in the cluster. Useful for resources that need
+ * to exist on all nodes. This way one can add/remove nodes without having
+ * the change the bounds.
+ * @param state
+ * @param bound
+ * @return Builder
+ */
+ public Builder dynamicUpperBound(String state, String bound) {
+ stateConstraintMap.put(state, bound);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/Transition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Transition.java b/helix-core/src/main/java/org/apache/helix/model/Transition.java
index 16fc937..70f8635 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Transition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Transition.java
@@ -38,6 +38,15 @@ public class Transition {
_toState = toState;
}
+ /**
+ * Instantiate with a source and destination state
+ * @param fromState source name
+ * @param toState destination name
+ */
+ public Transition(String fromState, String toState) {
+ this(State.from(fromState), State.from(toState));
+ }
+
@Override
public String toString() {
return _fromState + "-" + _toState;
@@ -60,7 +69,7 @@ public class Transition {
* Get the source state
* @return source state name
*/
- public State getFromState() {
+ public State getTypedFromState() {
return _fromState;
}
@@ -68,11 +77,27 @@ public class Transition {
* Get the destination state
* @return destination state name
*/
- public State getToState() {
+ public State getTypedToState() {
return _toState;
}
/**
+ * Get the source state
+ * @return source state name
+ */
+ public String getFromState() {
+ return _fromState.toString();
+ }
+
+ /**
+ * Get the destination state
+ * @return destination state name
+ */
+ public String getToState() {
+ return _toState.toString();
+ }
+
+ /**
* Create a new transition
* @param fromState string source state
* @param toState string destination state
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java
index 4c6edf7..779f220 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java
@@ -126,8 +126,8 @@ public class StateTransitionTableBuilder {
}
for (Transition transition : transitions) {
- State fromState = transition.getFromState();
- State toState = transition.getToState();
+ State fromState = transition.getTypedFromState();
+ State toState = transition.getTypedToState();
setPathVal(path, fromState.toString(), toState.toString(), 1);
setNext(next, fromState.toString(), toState.toString(), toState.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 83b93d8..afd2886 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -89,7 +89,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
// TODO fix this; IdealState shall have either map fields (CUSTOM mode)
// or list fields (AUDO mode)
- for (PartitionId partitionId : idealState.getPartitionSet()) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
Map<ParticipantId, State> idealRecord = idealState.getParticipantStateMap(partitionId);
Map<ParticipantId, State> externalViewRecord = externalView.getStateMap(partitionId);
@@ -113,7 +113,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
}
_numOfErrorPartitions = numOfErrorPartitions;
_externalViewIdealStateDiff = numOfDiff;
- _numOfPartitionsInExternalView = externalView.getPartitionSet().size();
+ _numOfPartitionsInExternalView = externalView.getPartitionIdSet().size();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 6ae8a1b..0e8c6fd 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -142,7 +142,7 @@ public class DistClusterControllerElection implements ControllerChangeListener {
leader = accessor.getProperty(keyBuilder.controllerLeader());
if (leader != null) {
- String leaderSessionId = leader.getSessionId().stringify();
+ String leaderSessionId = leader.getTypedSessionId().stringify();
LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+ leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 2258b95..4e4fdf6 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -166,13 +166,13 @@ public class HelixStateMachineEngine implements StateMachineEngine {
if (!type.equals(MessageType.STATE_TRANSITION.toString())) {
throw new HelixException("Expect state-transition message type, but was "
- + message.getMsgType() + ", msgId: " + message.getMsgId());
+ + message.getMsgType() + ", msgId: " + message.getMessageId());
}
PartitionId partitionKey = message.getPartitionId();
StateModelDefId stateModelId = message.getStateModelDefId();
ResourceId resourceId = message.getResourceId();
- SessionId sessionId = message.getTgtSessionId();
+ SessionId sessionId = message.getTypedTgtSessionId();
int bucketSize = message.getBucketSize();
if (stateModelId == null) {
@@ -210,7 +210,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
if (message.getBatchMessageMode() == false) {
// create currentStateDelta for this partition
- String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialStateString();
+ String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
StateModel stateModel = stateModelFactory.getStateModel(partitionKey.stringify());
if (stateModel == null) {
stateModel = stateModelFactory.createAndAddStateModel(partitionKey.stringify());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index ed411d1..9bba660 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -133,7 +133,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
if (externalViewList != null) {
for (ExternalView extView : externalViewList) {
String resourceName = extView.getId();
- for (String partitionName : extView.getPartitionStringSet()) {
+ for (String partitionName : extView.getPartitionSet()) {
Map<String, String> stateMap = extView.getStateMap(partitionName);
for (String instanceName : stateMap.keySet()) {
String currentState = stateMap.get(instanceName);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index ea03d58..f591a24 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -260,7 +260,7 @@ public class ClusterStateVerifier {
ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
ResourceAssignmentBuilder raBuilder = new ResourceAssignmentBuilder(resourceId);
- List<? extends PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
+ List<? extends PartitionId> mappedPartitions = resourceAssignment.getMappedPartitionIds();
for (PartitionId partitionId : mappedPartitions) {
raBuilder.addAssignments(partitionId, resourceAssignment.getReplicaMap(partitionId));
}
@@ -315,7 +315,7 @@ public class ClusterStateVerifier {
int extViewSize = extView.getRecord().getMapFields().size();
int bestPossStateSize =
bestPossOutput.getResourceAssignment(ResourceId.from(resourceName))
- .getMappedPartitions().size();
+ .getMappedPartitionIds().size();
if (extViewSize != bestPossStateSize) {
LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+ bestPossStateSize + ") for resource: " + resourceName);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
index f3ed88e..8120981 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -66,7 +66,7 @@ public class MessagePoster {
MessageId msgId = MessageId.from("TestMessageId-2");
Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setMsgId(msgId);
+ message.setMessageId(msgId);
message.setSrcName(msgSrc);
message.setTgtName(instanceName);
message.setMsgState(MessageState.NEW);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
index 58186d5..8b32ddc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
@@ -342,14 +342,14 @@ public class ZkLogAnalyzer {
// sendMessageLines.add(inputLine);
stats.msgSentCount++;
- if (msg.getFromState().toString().equals("OFFLINE")
- && msg.getToState().toString().equals("SLAVE")) {
+ if (msg.getTypedFromState().toString().equals("OFFLINE")
+ && msg.getTypedToState().toString().equals("SLAVE")) {
stats.msgSentCount_O2S++;
- } else if (msg.getFromState().toString().equals("SLAVE")
- && msg.getToState().toString().equals("MASTER")) {
+ } else if (msg.getTypedFromState().toString().equals("SLAVE")
+ && msg.getTypedToState().toString().equals("MASTER")) {
stats.msgSentCount_S2M++;
- } else if (msg.getFromState().toString().equals("MASTER")
- && msg.getToState().toString().equals("SLAVE")) {
+ } else if (msg.getTypedFromState().toString().equals("MASTER")
+ && msg.getTypedToState().toString().equals("SLAVE")) {
stats.msgSentCount_M2S++;
}
// System.out.println("Message create:"+new
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/142c9248/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index e46ad13..d304a87 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -41,7 +41,7 @@ public class RebalanceUtil {
Map<String, Integer> partitionIndex = new HashMap<String, Integer>();
Map<String, String> reversePartitionIndex = new HashMap<String, String>();
boolean indexInPartitionName = true;
- for (PartitionId partitionId : state.getPartitionSet()) {
+ for (PartitionId partitionId : state.getPartitionIdSet()) {
String partitionName = partitionId.stringify();
int lastPos = partitionName.lastIndexOf("_");
if (lastPos < 0) {
@@ -64,7 +64,7 @@ public class RebalanceUtil {
if (indexInPartitionName == false) {
List<String> partitions = new ArrayList<String>();
- partitions.addAll(Lists.transform(Lists.newArrayList(state.getPartitionSet()),
+ partitions.addAll(Lists.transform(Lists.newArrayList(state.getPartitionIdSet()),
Functions.toStringFunction()));
Collections.sort(partitions);
for (int i = 0; i < partitions.size(); i++) {
@@ -76,7 +76,7 @@ public class RebalanceUtil {
Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap =
new TreeMap<String, Map<String, List<Integer>>>();
- for (PartitionId partition : state.getPartitionSet()) {
+ for (PartitionId partition : state.getPartitionIdSet()) {
List<String> instances = state.getRecord().getListField(partition.stringify());
String master = instances.get(0);
if (!nodeMasterAssignmentMap.containsKey(master)) {
@@ -111,7 +111,7 @@ public class RebalanceUtil {
// StateModelDefinition def = new StateModelDefinition(stateModDef);
- List<String> statePriorityList = stateModDef.getStatesPriorityStringList();
+ List<String> statePriorityList = stateModDef.getStatesPriorityList();
for (String state : statePriorityList) {
String count = stateModDef.getNumInstancesPerState(state);