You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/09/14 20:56:53 UTC

[GitHub] [helix] zhangmeng916 commented on a change in pull request #1362: Move ST message to state validation from executing phase to scheduling phase.

zhangmeng916 commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r488206119



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  public Exception isMessageStaled(boolean inSchedulerCheck) {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
+
+    // state in _currentStateDelta uses current state from state model. It has the
+    // most up-to-date. current state. In case currentState in stateModel is null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    Exception err = null;
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      err = new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from message.",
+              partitionName, fromState, toState));
+    } else if (!inSchedulerCheck && fromState != null && !fromState.equals("*") && !fromState

Review comment:
       Besides the current_state equals to_state case, is this also a case that we can move to scheduling phase instead of execution phase? That could help reduce even more threads usage. 

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  public Exception isMessageStaled(boolean inSchedulerCheck) {

Review comment:
       minor: change isMessageStaled to isMessageStale. We didn't on purpose stale the message. 

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws InterruptedException {
     System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
   }
 
+  @Test()
+  public void testStaledMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 1;
+    String instanceName = manager.getInstanceName();
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("testStaledMessageResource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+            nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(200);
+
+    // The message should be ignored since toState is the same as current state.
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),

Review comment:
       How soon is the message gets deleted from instance?

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message> messages,
             // discard the message. Controller will resend if this is a valid message
             throw new HelixException(String.format(
                 "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), message.getToState()));
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task scheduled/executing.
+            Exception err = ((HelixStateTransitionHandler) createHandler).isMessageStaled(true /*inSchedulerCheck*/);

Review comment:
       The return type is confusing. `isMessageStaled` implies a boolean value as return. You can change the function name to something like validateStaleMessage, etc, or change the return type.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -980,9 +980,10 @@ public TaskState pollForJobState(String workflowName, String jobName, long timeo
         && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
-      throw new HelixException(
-          String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s",
-              workflowName, jobName, allowedStates));
+      String cur = ctx == null ? "null" : ctx.getJobState(jobName).toString();
+      throw new HelixException(String.format(
+          "Workflow \"%s\" context is null or job \"%s\" is not in states: %s, cur state is: %s",
+          workflowName, jobName, allowedStates, cur));

Review comment:
       Is this just a log improvement? The logic is not impacted by this PR's change, right?

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws InterruptedException {
     System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
   }
 
+  @Test()
+  public void testStaledMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 1;
+    String instanceName = manager.getInstanceName();
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("testStaledMessageResource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+            nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(200);
+
+    // The message should be ignored since toState is the same as current state.
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
+        0);
+
+    System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");

Review comment:
       The test name is wrong here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org