You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/26 22:35:43 UTC
helix git commit: [HELIX-705]: Participant duplicated state
transition handling rework
Repository: helix
Updated Branches:
refs/heads/master 323fbd049 -> 8dc19afb9
[HELIX-705]: Participant duplicated state transition handling rework
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8dc19afb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8dc19afb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8dc19afb
Branch: refs/heads/master
Commit: 8dc19afb9b70d262da0eb2081840d65f2a031122
Parents: 323fbd0
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jun 25 15:55:14 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Jun 26 15:33:47 2018 -0700
----------------------------------------------------------------------
.../handling/HelixStateTransitionHandler.java | 54 +++++++++++++++-----
.../helix/messaging/handling/HelixTask.java | 2 +-
.../messaging/handling/HelixTaskExecutor.java | 36 +++++++++++--
.../handling/TestHelixTaskExecutor.java | 51 +++++++++++++-----
4 files changed, 111 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 9412dde..36245de 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
@@ -61,6 +60,15 @@ public class HelixStateTransitionHandler extends MessageHandler {
}
}
+ /**
+ * If current state == toState in message, this is considered as Duplicated state transition
+ */
+ public static class HelixDuplicatedStateTransitionException extends Exception {
+ public HelixDuplicatedStateTransitionException(String info) {
+ super(info);
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
private final StateModel _stateModel;
StatusUpdateUtil _statusUpdateUtil;
@@ -104,24 +112,39 @@ public class HelixStateTransitionHandler extends MessageHandler {
String partitionName = _message.getPartitionName();
String fromState = _message.getFromState();
+ String toState = _message.getToState();
// Verify the fromState and current state of the stateModel
- String state = _currentStateDelta.getState(partitionName);
+ // getting current state from state model will provide most up-to-date
+ // current state. In case current state is null, partition is in initial
+ // state and we are setting it in current state
+ String state = _stateModel.getCurrentState() != null
+ ? _stateModel.getCurrentState()
+ : _currentStateDelta.getState(partitionName);
// Set start time right before invoke client logic
_currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
- if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
- String errorMessage =
- "Current state of stateModel does not match the fromState in Message"
- + ", Current State:" + state + ", message expected:" + fromState + ", partition: "
- + partitionName + ", from: " + _message.getMsgSrc() + ", to: "
- + _message.getTgtName();
+ Exception err = null;
+ if (toState.equalsIgnoreCase(state)) {
+ // To state equals current state, we can just ignore the message
+ err = new HelixDuplicatedStateTransitionException(
+ String.format("Partition %s current state is same as toState (%s->%s) from message.",
+ partitionName, fromState, toState)
+ );
+ } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
+ // If current state is neither toState nor fromState in message, there is a problem
+ err = new HelixStateMismatchException(
+ String.format(
+ "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
+ state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName())
+ );
+ }
- _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
- _manager);
- logger.error(errorMessage);
- throw new HelixStateMismatchException(errorMessage);
+ if (err != null) {
+ _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(), _manager);
+ logger.error(err.getMessage());
+ throw err;
}
// Reset the REQUESTED_STATE property if it exists.
@@ -319,6 +342,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
try {
preHandleMessage();
invoke(manager, context, taskResult, message);
+ } catch (HelixDuplicatedStateTransitionException e) {
+ // Duplicated state transition problem is fine
+ taskResult.setSuccess(true);
+ taskResult.setMessage(e.toString());
+ taskResult.setInfo(e.getMessage());
} catch (HelixStateMismatchException e) {
// Simply log error and return from here if State mismatch.
// The current state of the state model is intact.
@@ -414,7 +442,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
+ _stateModel.getClass();
logger.error(errorMessage);
taskResult.setSuccess(false);
-
+ taskResult.setInfo(errorMessage);
_statusUpdateUtil
.logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 337a933..3cca883 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -357,4 +357,4 @@ public class HelixTask implements MessageTask {
}
_isStarted = true;
}
-};
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index bb55604..9734cc8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -406,10 +406,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// Check to see if dedicate thread pool for handling state transition messages is configured or provided.
updateStateTransitionMessageThreadPool(message, manager);
- LOG.info("Scheduling message: " + taskId);
- // System.out.println("sched msg: " + message.getPartitionName() + "-"
- // + message.getTgtName() + "-" + message.getFromState() + "-"
- // + message.getToState());
+ LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(),
+ message.getPartitionName(), message.getFromState(), message.getToState());
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
"Message handling task scheduled", manager);
@@ -867,12 +865,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
String messageTarget =
getMessageTarget(message.getResourceName(), message.getPartitionName());
if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's state transition,
+ // the later one is discarded
Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
throw new HelixException(String.format(
- "Duplicated state transition message: %s. Existing: %s -> %s; New (Discarded): %s -> %s",
+ "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
+ } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+ && isStateTransitionInProgress(messageTarget)) {
+ // If there is another state transition for same partition is going on,
+ // discard the message. Controller will resend if this is a valid message
+ throw new HelixException(String
+ .format("Another state transition for %s:%s is in progress. Discarding %s->%s message",
+ message.getResourceName(), message.getPartitionName(), message.getFromState(),
+ message.getToState()));
}
+
stateTransitionHandlers
.put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
createHandler);
@@ -959,6 +968,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
}
}
+ /**
+ * Check if a state transition of the given message target is in progress. This function
+ * assumes the given message target corresponds to a state transition task
+ *
+ * @param messageTarget message target generated by getMessageTarget()
+ * @return true if there is a task going on with same message target else false
+ */
+ private boolean isStateTransitionInProgress(String messageTarget) {
+ synchronized (_lock) {
+ if (_messageTaskMap.containsKey(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ return !_taskMap.get(taskId).getFuture().isDone();
+ }
+ return false;
+ }
+ }
+
// Try to cancel this state transition that has not been started yet.
// Three Types of Cancellation: 1. Message arrived with previous state transition
// 2. Message handled but task not started
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index ea2bb20..9db842d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+
import com.google.common.collect.ImmutableList;
import org.apache.helix.HelixConstants;
@@ -94,7 +95,7 @@ public class TestHelixTaskExecutor {
@Override
public List<String> getMessageTypes() {
- return Arrays.asList(new String[]{"TestingMessageHandler", Message.MessageType.STATE_TRANSITION.name()});
+ return Collections.singletonList("TestingMessageHandler");
}
@Override
@@ -200,9 +201,16 @@ public class TestHelixTaskExecutor {
class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory {
ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
private final String _msgType;
+ private final long _delay;
public TestStateTransitionHandlerFactory(String msgType) {
+ this(msgType, -1);
+ }
+
+ public TestStateTransitionHandlerFactory(String msgType, long delay) {
_msgType = msgType;
+ _delay = delay;
}
+
class TestStateTransitionMessageHandler extends MessageHandler {
public TestStateTransitionMessageHandler(Message message, NotificationContext context) {
super(message, context);
@@ -212,6 +220,10 @@ public class TestHelixTaskExecutor {
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+ if (_delay > 0) {
+ System.out.println("Sleeping..." + _delay);
+ Thread.sleep(_delay);
+ }
result.setSuccess(true);
return result;
}
@@ -308,42 +320,55 @@ public class TestHelixTaskExecutor {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- for (String type : factory.getMessageTypes()) {
- executor.registerMessageHandlerFactory(type, factory);
- }
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+ executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
- int nMsgs = 10;
- String instanceName = "someInstance";
+ int nMsgs = 3;
+ String instanceName = manager.getInstanceName();
for (int i = 0; i < nMsgs; i++) {
Message msg =
new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
+ msg.setCreateTimeStamp((long) i);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setPartitionName("Partition");
msg.setResourceName("Resource");
+ msg.setStateModelDef("DummyMasterSlave");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
msgList.add(msg);
}
- System.out.println(dataAccessor.getChildNames(keyBuilder.messages(instanceName)));
AssertJUnit
.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), nMsgs);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(instanceName, msgList, changeContext);
- Thread.sleep(1000);
+ Thread.sleep(200);
- // Will not be able to process state transition messages, but we shall verify that
- // only 1 message is left over
- AssertJUnit
- .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
+ // only 1 message is left over - state transition takes 1sec
+ Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
+ // While a state transition message is going on, another state transition message for same
+ // resource / partition comes in, it should be discarded by message handler
+
+ // Mock accessor is modifying message state in memory so we set it back to NEW
+ msgList.get(2).setMsgState(MessageState.NEW);
+ dataAccessor.setProperty(msgList.get(2).getKey(keyBuilder, instanceName), msgList.get(2));
+ executor.onMessage(instanceName, Arrays.asList(msgList.get(2)), changeContext);
+ Thread.sleep(200);
+ Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
+
+ Thread.sleep(1000);
+ Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 0);
System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
}