You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/10/08 00:15:56 UTC
[helix] branch master updated: Move ST message to state validation
from executing phase to scheduling phase. (#1362)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 11cd868 Move ST message to state validation from executing phase to scheduling phase. (#1362)
11cd868 is described below
commit 11cd868fc3a98faee4eaba429ce0a47e2fd29ddb
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Oct 7 17:15:46 2020 -0700
Move ST message to state validation from executing phase to scheduling phase. (#1362)
Move ST message to state validation to scheduling phase.because of the following reasons:
It is a no-op operation, thus we should not create a thread to execute it.
The lag between scheduling and executing could add additional latency for controller and eventually application latency.
---
.../handling/HelixStateTransitionHandler.java | 79 ++++++++++++--------
.../messaging/handling/HelixTaskExecutor.java | 71 ++++++++++++------
.../messaging/handling/TestHelixTaskExecutor.java | 85 ++++++++++++++++++++--
3 files changed, 174 insertions(+), 61 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 8b6b01d..bdb0d0b 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -38,10 +38,6 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
-import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
-import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
@@ -50,6 +46,10 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
+import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
+import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +69,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
}
}
+ public static class StaleMessageValidateResult {
+ public boolean isValid;
+ public Exception exception;
+
+ StaleMessageValidateResult(Exception exp) {
+ exception = exp;
+ isValid = exception == null;
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
private final StateModel _stateModel;
StatusUpdateUtil _statusUpdateUtil;
@@ -107,39 +117,18 @@ public class HelixStateTransitionHandler extends MessageHandler {
+ _message.getToState() + ", relayedFrom: " + _message.getRelaySrcHost());
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
String partitionName = _message.getPartitionName();
- String fromState = _message.getFromState();
- String toState = _message.getToState();
-
- // Verify the fromState and current state of the stateModel
- // getting current state from state model will provide most up-to-date
- // current state. In case current state is null, partition is in initial
- // state and we are setting it in current state
- String state = _stateModel.getCurrentState() != null ? _stateModel.getCurrentState()
- : _currentStateDelta.getState(partitionName);
// Set start time right before invoke client logic
_currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
- Exception err = null;
- if (toState.equalsIgnoreCase(state)) {
- // To state equals current state, we can just ignore the message
- err = new HelixDuplicatedStateTransitionException(
- String.format("Partition %s current state is same as toState (%s->%s) from message.",
- partitionName, fromState, toState));
- } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
- // If current state is neither toState nor fromState in message, there is a problem
- err = new HelixStateMismatchException(String.format(
- "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
- state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
- }
-
- if (err != null) {
- _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(),
- _manager);
- logger.error(err.getMessage());
- throw err;
+ StaleMessageValidateResult err = staleMessageValidator();
+ if (!err.isValid) {
+ _statusUpdateUtil
+ .logError(_message, HelixStateTransitionHandler.class, err.exception.getMessage(),
+ _manager);
+ logger.error(err.exception.getMessage());
+ throw err.exception;
}
// Reset the REQUESTED_STATE property if it exists.
@@ -460,7 +449,33 @@ public class HelixStateTransitionHandler extends MessageHandler {
StateTransitionError error = new StateTransitionError(type, code, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
}
+ }
+
+ // Verify the fromState and current state of the stateModel.
+ public StaleMessageValidateResult staleMessageValidator() {
+ String fromState = _message.getFromState();
+ String toState = _message.getToState();
+ String partitionName = _message.getPartitionName();
+ // state in _currentStateDelta uses current state from state model. It has the
+ // most up-to-date. current state. In case currentState in stateModel is null,
+ // partition is in initial state and we using it as current state.
+ // Defined in HelixStateMachineEngine.
+ String state = _currentStateDelta.getState(partitionName);
+
+ Exception err = null;
+ if (toState.equalsIgnoreCase(state)) {
+ // To state equals current state, we can just ignore the message
+ err = new HelixDuplicatedStateTransitionException(String
+ .format("Partition %s current state is same as toState (%s->%s) from message.",
+ partitionName, fromState, toState));
+ } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
+ // If current state is neither toState nor fromState in message, there is a problem
+ err = new HelixStateMismatchException(String.format(
+ "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
+ state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
+ }
+ return new StaleMessageValidateResult(err);
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index f5c2dea..1e9c455 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -903,15 +903,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
String messageTarget =
getMessageTarget(message.getResourceName(), message.getPartitionName());
- if (stateTransitionHandlers.containsKey(messageTarget)) {
- // If there are 2 messages in same batch about same partition's state transition,
- // the later one is discarded
- Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
- throw new HelixException(String.format(
- "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
- message.getMsgId(), duplicatedMessage.getFromState(),
- duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
- } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
&& isStateTransitionInProgress(messageTarget)) {
String taskId = _messageTaskMap.get(messageTarget);
@@ -919,11 +912,36 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// If there is another state transition for same partition is going on,
// discard the message. Controller will resend if this is a valid message
- throw new HelixException(String.format(
+ String errMsg = String.format(
"Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(), message.getToState());
+ handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
+ instanceName, manager);
+ continue;
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task scheduled/executing.
+ HelixStateTransitionHandler.StaleMessageValidateResult result =
+ ((HelixStateTransitionHandler) createHandler).staleMessageValidator();
+ if (!result.isValid) {
+ handleUnprocessableMessage(message, null /* exception */,
+ result.exception.getMessage(), accessor, instanceName, manager);
+ continue;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's state transition,
+ // the later one is discarded
+ Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
+ String errMsg = String.format(
+ "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
+ message.getMsgId(), duplicatedMessage.getFromState(),
+ duplicatedMessage.getToState(), message.getFromState(), message.getToState());
+ handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
+ instanceName, manager);
+ continue;
}
stateTransitionHandlers
@@ -937,16 +955,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
nonStateTransitionContexts.add(msgWorkingContext);
}
} catch (Exception e) {
- LOG.error("Failed to create message handler for " + message.getMsgId(), e);
- String error =
- "Failed to create message handler for " + message.getMsgId() + ", exception: " + e;
-
- _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, manager);
-
- message.setMsgState(MessageState.UNPROCESSABLE);
- removeMessageFromZK(accessor, message, instanceName);
- LOG.error("Message cannot be processed: " + message.getRecord(), e);
- _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+ handleUnprocessableMessage(message, e, e.getMessage(), accessor, instanceName, manager);
continue;
}
@@ -1107,6 +1116,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", manager);
}
+ private void handleUnprocessableMessage(Message message, Exception exception, String errorMsg,
+ HelixDataAccessor accessor, String instanceName, HelixManager manager) {
+ String error = "Message " + message.getMsgId() + " cannot be processed: " + message.getRecord();
+ if (exception != null) {
+ LOG.error(error, exception);
+ _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, exception, error, manager);
+ } else {
+ LOG.error(error + errorMsg);
+ _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, errorMsg, manager);
+ }
+ message.setMsgState(MessageState.UNPROCESSABLE);
+ removeMessageFromZK(accessor, message, instanceName);
+ _monitor.reportProcessedMessage(message,
+ ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+
+ }
public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
String msgType = message.getMsgType().toString();
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 2fca2b9..706284d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -37,6 +37,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.mock.MockManager;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.testng.Assert;
@@ -212,18 +213,24 @@ public class TestHelixTaskExecutor {
_delay = delay;
}
- class TestStateTransitionMessageHandler extends MessageHandler {
- public TestStateTransitionMessageHandler(Message message, NotificationContext context) {
- super(message, context);
+ class TestStateTransitionMessageHandler extends HelixStateTransitionHandler {
+
+ public TestStateTransitionMessageHandler(Message message, NotificationContext context,
+ CurrentState currentStateDelta) {
+ super(null, null, message, context, currentStateDelta);
}
@Override
- public HelixTaskResult handleMessage() throws InterruptedException {
+ public HelixTaskResult handleMessage() {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
if (_delay > 0) {
System.out.println("Sleeping..." + _delay);
- Thread.sleep(_delay);
+ try {
+ Thread.sleep(_delay);
+ } catch (Exception e) {
+ assert (false);
+ }
}
result.setSuccess(true);
return result;
@@ -232,11 +239,28 @@ public class TestHelixTaskExecutor {
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
}
+
+ @Override
+ public StaleMessageValidateResult staleMessageValidator() {
+ return super.staleMessageValidator();
+ }
}
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
- return new TestStateTransitionMessageHandler(message, context);
+ CurrentState currentStateDelta = new CurrentState(message.getResourceName());
+ currentStateDelta.setSessionId(message.getTgtSessionId());
+ currentStateDelta.setStateModelDefRef(message.getStateModelDef());
+ currentStateDelta.setStateModelFactoryName(message.getStateModelFactoryName());
+ currentStateDelta.setBucketSize(message.getBucketSize());
+ if (!message.getResourceName().equals("testStaledMessageResource")) {
+ // set the current state same as from state in the message in test testStaledMessage.
+ currentStateDelta.setState(message.getPartitionName(), "SLAVE");
+ } else {
+ // set the current state same as to state in the message in test testStaledMessage.
+ currentStateDelta.setState(message.getPartitionName(), "MASTER");
+ }
+ return new TestStateTransitionMessageHandler(message, context, currentStateDelta);
}
@Override
@@ -378,6 +402,55 @@ public class TestHelixTaskExecutor {
}
@Test()
+ public void testStaledMessage() throws InterruptedException {
+ System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+ executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs = 1;
+ String instanceName = manager.getInstanceName();
+ for (int i = 0; i < nMsgs; i++) {
+ Message msg =
+ new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setCreateTimeStamp((long) i);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setPartitionName("Partition");
+ msg.setResourceName("testStaledMessageResource");
+ msg.setStateModelDef("DummyMasterSlave");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
+ dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+ msgList.add(msg);
+ }
+
+ Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+ nMsgs);
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(instanceName, msgList, changeContext);
+
+ Thread.sleep(200);
+
+ // The message should be ignored since toState is the same as current state.
+ Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+ 0);
+
+ System.out.println("END TestHelixTaskExecutor.testStaledMessage()");
+ }
+
+ @Test()
public void testUnknownTypeMsgExecution() throws InterruptedException {
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();