You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/26 22:35:43 UTC

helix git commit: [HELIX-705]: Participant duplicated state transition handling rework

Repository: helix
Updated Branches:
  refs/heads/master 323fbd049 -> 8dc19afb9


[HELIX-705]: Participant duplicated state transition handling rework


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8dc19afb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8dc19afb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8dc19afb

Branch: refs/heads/master
Commit: 8dc19afb9b70d262da0eb2081840d65f2a031122
Parents: 323fbd0
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jun 25 15:55:14 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Jun 26 15:33:47 2018 -0700

----------------------------------------------------------------------
 .../handling/HelixStateTransitionHandler.java   | 54 +++++++++++++++-----
 .../helix/messaging/handling/HelixTask.java     |  2 +-
 .../messaging/handling/HelixTaskExecutor.java   | 36 +++++++++++--
 .../handling/TestHelixTaskExecutor.java         | 51 +++++++++++++-----
 4 files changed, 111 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 9412dde..36245de 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
@@ -61,6 +60,15 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
+  /**
+   * If current state == toState in message, this is considered as Duplicated state transition
+   */
+  public static class HelixDuplicatedStateTransitionException extends Exception {
+    public HelixDuplicatedStateTransitionException(String info) {
+      super(info);
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
   private final StateModel _stateModel;
   StatusUpdateUtil _statusUpdateUtil;
@@ -104,24 +112,39 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
     String partitionName = _message.getPartitionName();
     String fromState = _message.getFromState();
+    String toState = _message.getToState();
 
     // Verify the fromState and current state of the stateModel
-    String state = _currentStateDelta.getState(partitionName);
+    // getting current state from state model will provide most up-to-date
+    // current state. In case current state is null, partition is in initial
+    // state and we are setting it in current state
+    String state = _stateModel.getCurrentState() != null
+        ? _stateModel.getCurrentState()
+        : _currentStateDelta.getState(partitionName);
 
     // Set start time right before invoke client logic
     _currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
 
-    if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
-      String errorMessage =
-          "Current state of stateModel does not match the fromState in Message"
-              + ", Current State:" + state + ", message expected:" + fromState + ", partition: "
-              + partitionName + ", from: " + _message.getMsgSrc() + ", to: "
-              + _message.getTgtName();
+    Exception err = null;
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      err = new HelixDuplicatedStateTransitionException(
+          String.format("Partition %s current state is same as toState (%s->%s) from message.",
+              partitionName, fromState, toState)
+      );
+    } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
+      // If current state is neither toState nor fromState in message, there is a problem
+      err = new HelixStateMismatchException(
+          String.format(
+              "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
+              state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName())
+      );
+    }
 
-      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
-          _manager);
-      logger.error(errorMessage);
-      throw new HelixStateMismatchException(errorMessage);
+    if (err != null) {
+      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(), _manager);
+      logger.error(err.getMessage());
+      throw err;
     }
 
     // Reset the REQUESTED_STATE property if it exists.
@@ -319,6 +342,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
       try {
         preHandleMessage();
         invoke(manager, context, taskResult, message);
+      } catch (HelixDuplicatedStateTransitionException e) {
+        // Duplicated state transition problem is fine
+        taskResult.setSuccess(true);
+        taskResult.setMessage(e.toString());
+        taskResult.setInfo(e.getMessage());
       } catch (HelixStateMismatchException e) {
         // Simply log error and return from here if State mismatch.
         // The current state of the state model is intact.
@@ -414,7 +442,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
               + _stateModel.getClass();
       logger.error(errorMessage);
       taskResult.setSuccess(false);
-
+      taskResult.setInfo(errorMessage);
       _statusUpdateUtil
           .logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 337a933..3cca883 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -357,4 +357,4 @@ public class HelixTask implements MessageTask {
     }
     _isStarted = true;
   }
-};
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index bb55604..9734cc8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -406,10 +406,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       // Check to see if dedicate thread pool for handling state transition messages is configured or provided.
       updateStateTransitionMessageThreadPool(message, manager);
 
-      LOG.info("Scheduling message: " + taskId);
-      // System.out.println("sched msg: " + message.getPartitionName() + "-"
-      // + message.getTgtName() + "-" + message.getFromState() + "-"
-      // + message.getToState());
+      LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(),
+          message.getPartitionName(), message.getFromState(), message.getToState());
 
       _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
           "Message handling task scheduled", manager);
@@ -867,12 +865,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           String messageTarget =
               getMessageTarget(message.getResourceName(), message.getPartitionName());
           if (stateTransitionHandlers.containsKey(messageTarget)) {
+            // If there are 2 messages in same batch about same partition's state transition,
+            // the later one is discarded
             Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
             throw new HelixException(String.format(
-                "Duplicated state transition message: %s. Existing: %s -> %s; New (Discarded): %s -> %s",
+                "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
                 message.getMsgId(), duplicatedMessage.getFromState(),
                 duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
+          } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+              && isStateTransitionInProgress(messageTarget)) {
+            // If there is another state transition for same partition is going on,
+            // discard the message. Controller will resend if this is a valid message
+            throw new HelixException(String
+                .format("Another state transition for %s:%s is in progress. Discarding %s->%s message",
+                    message.getResourceName(), message.getPartitionName(), message.getFromState(),
+                    message.getToState()));
           }
+
           stateTransitionHandlers
               .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
                   createHandler);
@@ -959,6 +968,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     }
   }
 
+  /**
+   * Check if a state transition of the given message target is in progress. This function
+   * assumes the given message target corresponds to a state transition task
+   *
+   * @param messageTarget message target generated by getMessageTarget()
+   * @return true if there is a task going on with same message target else false
+   */
+  private boolean isStateTransitionInProgress(String messageTarget) {
+    synchronized (_lock) {
+      if (_messageTaskMap.containsKey(messageTarget)) {
+        String taskId = _messageTaskMap.get(messageTarget);
+        return !_taskMap.get(taskId).getFuture().isDone();
+      }
+      return false;
+    }
+  }
+
   // Try to cancel this state transition that has not been started yet.
   // Three Types of Cancellation: 1. Message arrived with previous state transition
   //                              2. Message handled but task not started

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index ea2bb20..9db842d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+
 import com.google.common.collect.ImmutableList;
 
 import org.apache.helix.HelixConstants;
@@ -94,7 +95,7 @@ public class TestHelixTaskExecutor {
 
     @Override
     public List<String> getMessageTypes() {
-      return Arrays.asList(new String[]{"TestingMessageHandler", Message.MessageType.STATE_TRANSITION.name()});
+      return Collections.singletonList("TestingMessageHandler");
     }
 
     @Override
@@ -200,9 +201,16 @@ public class TestHelixTaskExecutor {
   class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory {
     ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
     private final String _msgType;
+    private final long _delay;
     public TestStateTransitionHandlerFactory(String msgType) {
+      this(msgType, -1);
+    }
+
+    public TestStateTransitionHandlerFactory(String msgType, long delay) {
       _msgType = msgType;
+      _delay = delay;
     }
+
     class TestStateTransitionMessageHandler extends MessageHandler {
       public TestStateTransitionMessageHandler(Message message, NotificationContext context) {
         super(message, context);
@@ -212,6 +220,10 @@ public class TestHelixTaskExecutor {
       public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        if (_delay > 0) {
+          System.out.println("Sleeping..." + _delay);
+          Thread.sleep(_delay);
+        }
         result.setSuccess(true);
         return result;
       }
@@ -308,42 +320,55 @@ public class TestHelixTaskExecutor {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
 
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    for (String type : factory.getMessageTypes()) {
-      executor.registerMessageHandlerFactory(type, factory);
-    }
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
 
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
 
-    int nMsgs = 10;
-    String instanceName = "someInstance";
+    int nMsgs = 3;
+    String instanceName = manager.getInstanceName();
     for (int i = 0; i < nMsgs; i++) {
       Message msg =
           new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
       msg.setPartitionName("Partition");
       msg.setResourceName("Resource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
       dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
       msgList.add(msg);
     }
 
-    System.out.println(dataAccessor.getChildNames(keyBuilder.messages(instanceName)));
     AssertJUnit
         .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), nMsgs);
 
     changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage(instanceName, msgList, changeContext);
 
-    Thread.sleep(1000);
+    Thread.sleep(200);
 
-    // Will not be able to process state transition messages, but we shall verify that
-    // only 1 message is left over
-    AssertJUnit
-        .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
+    // only 1 message is left over - state transition takes 1sec
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
 
+    // While a state transition message is going on, another state transition message for same
+    // resource / partition comes in, it should be discarded by message handler
+
+    // Mock accessor is modifying message state in memory so we set it back to NEW
+    msgList.get(2).setMsgState(MessageState.NEW);
+    dataAccessor.setProperty(msgList.get(2).getKey(keyBuilder, instanceName), msgList.get(2));
+    executor.onMessage(instanceName, Arrays.asList(msgList.get(2)), changeContext);
+    Thread.sleep(200);
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1);
+
+    Thread.sleep(1000);
+    Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 0);
     System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
   }