You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/10/08 00:15:56 UTC

[helix] branch master updated: Move ST message to state validation from executing phase to scheduling phase. (#1362)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 11cd868  Move ST message to state validation from executing phase to scheduling phase. (#1362)
11cd868 is described below

commit 11cd868fc3a98faee4eaba429ce0a47e2fd29ddb
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Oct 7 17:15:46 2020 -0700

    Move ST message to state validation from executing phase to scheduling phase. (#1362)
    
    Move ST message to state validation to scheduling phase.because of the following reasons:
    It is a no-op operation, thus we should not create a thread to execute it.
    The lag between scheduling and executing could add additional latency for controller and eventually application latency.
---
 .../handling/HelixStateTransitionHandler.java      | 79 ++++++++++++--------
 .../messaging/handling/HelixTaskExecutor.java      | 71 ++++++++++++------
 .../messaging/handling/TestHelixTaskExecutor.java  | 85 ++++++++++++++++++++--
 3 files changed, 174 insertions(+), 61 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 8b6b01d..bdb0d0b 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -38,10 +38,6 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
-import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
-import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
@@ -50,6 +46,10 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
 import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
+import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
+import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +69,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
+  public static class StaleMessageValidateResult {
+    public boolean isValid;
+    public Exception exception;
+
+    StaleMessageValidateResult(Exception exp) {
+      exception = exp;
+      isValid = exception == null;
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
   private final StateModel _stateModel;
   StatusUpdateUtil _statusUpdateUtil;
@@ -107,39 +117,18 @@ public class HelixStateTransitionHandler extends MessageHandler {
         + _message.getToState() + ", relayedFrom: " + _message.getRelaySrcHost());
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
     String partitionName = _message.getPartitionName();
-    String fromState = _message.getFromState();
-    String toState = _message.getToState();
-
-    // Verify the fromState and current state of the stateModel
-    // getting current state from state model will provide most up-to-date
-    // current state. In case current state is null, partition is in initial
-    // state and we are setting it in current state
-    String state = _stateModel.getCurrentState() != null ? _stateModel.getCurrentState()
-        : _currentStateDelta.getState(partitionName);
 
     // Set start time right before invoke client logic
     _currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
 
-    Exception err = null;
-    if (toState.equalsIgnoreCase(state)) {
-      // To state equals current state, we can just ignore the message
-      err = new HelixDuplicatedStateTransitionException(
-          String.format("Partition %s current state is same as toState (%s->%s) from message.",
-              partitionName, fromState, toState));
-    } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
-      // If current state is neither toState nor fromState in message, there is a problem
-      err = new HelixStateMismatchException(String.format(
-          "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
-          state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
-    }
-
-    if (err != null) {
-      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(),
-          _manager);
-      logger.error(err.getMessage());
-      throw err;
+    StaleMessageValidateResult err = staleMessageValidator();
+    if (!err.isValid) {
+      _statusUpdateUtil
+          .logError(_message, HelixStateTransitionHandler.class, err.exception.getMessage(),
+              _manager);
+      logger.error(err.exception.getMessage());
+      throw err.exception;
     }
 
     // Reset the REQUESTED_STATE property if it exists.
@@ -460,7 +449,33 @@ public class HelixStateTransitionHandler extends MessageHandler {
       StateTransitionError error = new StateTransitionError(type, code, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
     }
+  }
+
+  // Verify the fromState and current state of the stateModel.
+  public StaleMessageValidateResult staleMessageValidator() {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
 
+    // state in _currentStateDelta uses current state from state model. It has the
+    // most up-to-date. current state. In case currentState in stateModel is null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    Exception err = null;
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      err = new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from message.",
+              partitionName, fromState, toState));
+    } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
+      // If current state is neither toState nor fromState in message, there is a problem
+      err = new HelixStateMismatchException(String.format(
+          "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
+          state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
+    }
+    return new StaleMessageValidateResult(err);
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index f5c2dea..1e9c455 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -903,15 +903,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
           String messageTarget =
               getMessageTarget(message.getResourceName(), message.getPartitionName());
-          if (stateTransitionHandlers.containsKey(messageTarget)) {
-            // If there are 2 messages in same batch about same partition's state transition,
-            // the later one is discarded
-            Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
-            throw new HelixException(String.format(
-                "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
-                message.getMsgId(), duplicatedMessage.getFromState(),
-                duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
-          } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+
+          if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
               && isStateTransitionInProgress(messageTarget)) {
 
             String taskId = _messageTaskMap.get(messageTarget);
@@ -919,11 +912,36 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
             // If there is another state transition for same partition is going on,
             // discard the message. Controller will resend if this is a valid message
-            throw new HelixException(String.format(
+            String errMsg = String.format(
                 "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), message.getToState());
+            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
+                instanceName, manager);
+            continue;
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task scheduled/executing.
+            HelixStateTransitionHandler.StaleMessageValidateResult result =
+                ((HelixStateTransitionHandler) createHandler).staleMessageValidator();
+            if (!result.isValid) {
+              handleUnprocessableMessage(message, null /* exception */,
+                  result.exception.getMessage(), accessor, instanceName, manager);
+              continue;
+            }
+          }
+          if (stateTransitionHandlers.containsKey(messageTarget)) {
+            // If there are 2 messages in same batch about same partition's state transition,
+            // the later one is discarded
+            Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
+            String errMsg = String.format(
+                "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
+                message.getMsgId(), duplicatedMessage.getFromState(),
+                duplicatedMessage.getToState(), message.getFromState(), message.getToState());
+            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
+                instanceName, manager);
+            continue;
           }
 
           stateTransitionHandlers
@@ -937,16 +955,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           nonStateTransitionContexts.add(msgWorkingContext);
         }
       } catch (Exception e) {
-        LOG.error("Failed to create message handler for " + message.getMsgId(), e);
-        String error =
-            "Failed to create message handler for " + message.getMsgId() + ", exception: " + e;
-
-        _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, manager);
-
-        message.setMsgState(MessageState.UNPROCESSABLE);
-        removeMessageFromZK(accessor, message, instanceName);
-        LOG.error("Message cannot be processed: " + message.getRecord(), e);
-        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+        handleUnprocessableMessage(message, e, e.getMessage(), accessor, instanceName, manager);
         continue;
       }
 
@@ -1107,6 +1116,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", manager);
   }
 
+  private void handleUnprocessableMessage(Message message, Exception exception, String errorMsg,
+      HelixDataAccessor accessor, String instanceName, HelixManager manager) {
+    String error = "Message " + message.getMsgId() + " cannot be processed: " + message.getRecord();
+    if (exception != null) {
+      LOG.error(error, exception);
+      _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, exception, error, manager);
+    } else {
+      LOG.error(error + errorMsg);
+      _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, errorMsg, manager);
+    }
+    message.setMsgState(MessageState.UNPROCESSABLE);
+    removeMessageFromZK(accessor, message, instanceName);
+    _monitor.reportProcessedMessage(message,
+        ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+
+  }
   public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
     String msgType = message.getMsgType().toString();
 
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 2fca2b9..706284d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -37,6 +37,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.mock.MockManager;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.testng.Assert;
@@ -212,18 +213,24 @@ public class TestHelixTaskExecutor {
       _delay = delay;
     }
 
-    class TestStateTransitionMessageHandler extends MessageHandler {
-      public TestStateTransitionMessageHandler(Message message, NotificationContext context) {
-        super(message, context);
+    class TestStateTransitionMessageHandler extends HelixStateTransitionHandler {
+
+      public TestStateTransitionMessageHandler(Message message, NotificationContext context,
+          CurrentState currentStateDelta) {
+        super(null, null, message, context, currentStateDelta);
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         HelixTaskResult result = new HelixTaskResult();
         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
         if (_delay > 0) {
           System.out.println("Sleeping..." + _delay);
-          Thread.sleep(_delay);
+          try {
+            Thread.sleep(_delay);
+          } catch (Exception e) {
+            assert (false);
+          }
         }
         result.setSuccess(true);
         return result;
@@ -232,11 +239,28 @@ public class TestHelixTaskExecutor {
       @Override
       public void onError(Exception e, ErrorCode code, ErrorType type) {
       }
+
+      @Override
+      public StaleMessageValidateResult staleMessageValidator() {
+        return super.staleMessageValidator();
+      }
     }
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
-      return new TestStateTransitionMessageHandler(message, context);
+      CurrentState currentStateDelta = new CurrentState(message.getResourceName());
+      currentStateDelta.setSessionId(message.getTgtSessionId());
+      currentStateDelta.setStateModelDefRef(message.getStateModelDef());
+      currentStateDelta.setStateModelFactoryName(message.getStateModelFactoryName());
+      currentStateDelta.setBucketSize(message.getBucketSize());
+      if (!message.getResourceName().equals("testStaledMessageResource")) {
+        // set the current state same as from state in the message in test testStaledMessage.
+        currentStateDelta.setState(message.getPartitionName(), "SLAVE");
+      } else {
+        // set the current state same as to state in the message in test testStaledMessage.
+        currentStateDelta.setState(message.getPartitionName(), "MASTER");
+      }
+      return new TestStateTransitionMessageHandler(message, context, currentStateDelta);
     }
 
     @Override
@@ -378,6 +402,55 @@ public class TestHelixTaskExecutor {
   }
 
   @Test()
+  public void testStaledMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 1;
+    String instanceName = manager.getInstanceName();
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("testStaledMessageResource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+        nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(200);
+
+    // The message should be ignored since toState is the same as current state.
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+        0);
+
+    System.out.println("END TestHelixTaskExecutor.testStaledMessage()");
+  }
+
+  @Test()
   public void testUnknownTypeMsgExecution() throws InterruptedException {
     HelixTaskExecutor executor = new HelixTaskExecutor();
     HelixManager manager = new MockClusterManager();