You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/23 00:46:06 UTC

[helix] branch master updated: Remove waiting on message deletion if current state is already updated (#1068)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d6d97c8  Remove waiting on message deletion if current state is already updated (#1068)
d6d97c8 is described below

commit d6d97c819970d5f6b76d9e3ade441ef8017fdb2f
Author: Meng Zhang <mn...@linkedin.com>
AuthorDate: Mon Jun 22 17:45:59 2020 -0700

    Remove waiting on message deletion if current state is already updated (#1068)
    
    Remove the waiting logic in message generation phase on message deletion if current state is already updated. This will help increase the rate of P2P message during mastership handoff.
---
 .../controller/stages/MessageGenerationPhase.java  |  12 +-
 .../messaging/TestP2PNoDuplicatedMessage.java      |  10 +-
 .../TestP2PMessagesAvoidDuplicatedMessage.java     | 141 ++++++++++++---------
 .../p2pMessage/TestP2PStateTransitionMessages.java |  92 +++++---------
 4 files changed, 130 insertions(+), 125 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 4223c37..d098b79 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -220,7 +220,17 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
                 generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
                     manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
                     cancellationMessage, isCancellationEnabled);
-          } else {
+          }
+          // We will generate new message if pending message is null or current state equals
+          // pending message's toState (no cancellation message should be generated in this case)
+          if (pendingMessage == null || (message == null &&
+              currentState.equalsIgnoreCase(pendingMessage.getToState()))) {
+            if (pendingMessage != null) {
+              logger.info("Ignore the pending message for resource %s partition %s instance "
+                      + "%s since the current state %s equals toState of pending message.",
+                  resource.getResourceName(), partition.getPartitionName(), instanceName,
+                  currentState);
+            }
             // Create new state transition message
             message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
                 instanceName, currentState, nextState, sessionIdMap.get(instanceName),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index a910f28..c0cea8f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -62,12 +62,12 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   final String CLASS_NAME = getShortClassName();
   final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
-  static final int PARTICIPANT_NUMBER = 6;
+  static final int PARTICIPANT_NUMBER = 10;
   static final int PARTICIPANT_START_PORT = 12918;
 
   static final int DB_COUNT = 2;
 
-  static final int PARTITION_NUMBER = 50;
+  static final int PARTITION_NUMBER = 100;
   static final int REPLICA_NUMBER = 3;
 
   final String _controllerName = CONTROLLER_PREFIX + "_0";
@@ -171,11 +171,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
       verifyP2PEnabled(startTime);
     }
 
-    double ratio = ((double) p2pTrigged) / ((double) total);
-    Assert.assertTrue(ratio > 0.6, String
-       .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
-           p2pTrigged, total));
-
+    Assert.assertEquals(p2pTrigged, total);
     Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
         "There are duplicated transition messages sent while participant is handling the state-transition!");
     Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 3a1ea0e..c42bebe 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -152,9 +152,10 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
     // Scenario 2A:
-    // Old master (initialMaster) completes the M->S transition,
-    // but has not forward p2p message to new master (secondMaster) yet.
-    // Validate: Controller should not send S->M message to new master.
+    // Old master (initialMaster) completes the M->S transition, but the M->S has not been deleted,
+    // and it has not forward p2p message to new master (secondMaster) yet.
+    // Validate: Controller should not send S->M message to new master, but should send S->O to
+    // old master
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
     currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
     currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
@@ -165,13 +166,18 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
 
     messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(_db, _partition);
-    Assert.assertEquals(messages.size(), 0);
+    Assert.assertEquals(messages.size(), 1);
+    Message toOfflineMessage = messages.get(0);
+    Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster);
+    Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
 
 
     // Scenario 2B:
-    // Old master (initialMaster) completes the M->S transition,
+    // Old master (initialMaster) completes the M->S transition, and the message has been
+    // deleted, but the message S->O is still there.
     // There is a pending p2p message to new master (secondMaster).
-    // Validate: Controller should send S->M message to new master at same time.
+    // Validate: Controller should send S->M message to new master at same time
 
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
     currentStateOutput.getPendingMessageMap(_db, _partition).clear();
@@ -201,7 +207,8 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertTrue(hasToOffline);
 
     // Secenario 2C
-    // Old master (initialMaster) completes the M->S transition,
+    // Old master (initialMaster) completes the M->S transition, and the message has been
+    // deleted, but the message S->O is still there.
     // There is a pending p2p message to new master (secondMaster).
     // However, the new master has been changed in bestPossible
     // Validate: Controller should not send S->M message to the third master at same time.
@@ -215,24 +222,23 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     instanceStateMap.put(thirdMaster, "MASTER");
     _bestpossibleState.setState(_db, _partition, instanceStateMap);
 
-
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
 
+    currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toOfflineMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
     _messagePipeline.handle(event);
 
     messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(_db, _partition);
-    Assert.assertEquals(messages.size(), 1);
-    Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE"));
-    Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster));
-
+    Assert.assertEquals(messages.size(), 0);
 
     // Scenario 3:
-    // Old master (initialMaster) completes the M->S transition,
+    // Old master (initialMaster) completes the S->O transition,
     // and has already forwarded p2p message to new master (secondMaster)
-    // The original S->M message sent to old master has been removed.
-    // Validate: Controller should send S->O to old master, but not S->M message to new master.
+    // The original S->O message sent to old master has been removed.
+    // Validate: Controller should not send S->M message to new master.
     instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
     instanceStateMap.put(secondMaster, "MASTER");
     instanceStateMap.put(thirdMaster, "SLAVE");
@@ -240,54 +246,69 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
 
     currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
-    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
     currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    _fullPipeline.handle(event);
+    _messagePipeline.handle(event);
 
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(_db, _partition);
-    Assert.assertEquals(messages.size(), 1);
-
-    Message toOfflineMessage = messages.get(0);
-    Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster);
-    Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
-    Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
-
+    Assert.assertEquals(messages.size(), 0);
 
     // Scenario 4:
-    // The old master (initialMaster) finish state transition, but has not forward p2p message yet.
-    // Then the preference list has changed, so now the new master (thirdMaster) is different from previously calculated new master (secondMaster)
-    // Validate: controller should not send S->M to thirdMaster.
-    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
-    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    // The old master (initialMaster) finish state transition, and forward p2p message to
+    // secondMaster. The secondMaster has finished state transition too, but the message has not
+    // been deleted yet.
+    // Then the preference list has changed, so now the new master (thirdMaster) is different
+    // from previously calculated new master (secondMaster)
+    // Validate: controller should not send S->M to thirdMaster, but should send M->S to
+    // secondMaster.
 
     thirdMaster =
         getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
             MasterSlaveSMD.States.SLAVE.name());
-
     instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
     instanceStateMap.put(secondMaster, "SLAVE");
     instanceStateMap.put(thirdMaster, "MASTER");
     _bestpossibleState.setState(_db, _partition, instanceStateMap);
-
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
 
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
+    currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER");
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
     _messagePipeline.handle(event);
 
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(_db, _partition);
-    Assert.assertEquals(messages.size(), 0);
+    Assert.assertEquals(messages.size(), 1);
+    toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
 
+    // verify p2p message are attached to the M->S message sent to the second master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    thirdMaster =
+        getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), MasterSlaveSMD.States.MASTER.name());
+
+    relayMessage = toSlaveMessage.getRelayMessage(thirdMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), thirdMaster);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
     // Scenario 5:
-    // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster,
-    // But the S->M state-transition has not completed yet in secondMaster.
-    // Validate: Controller should not send S->M to thirdMaster.
-    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
+    // The second master has finished transition from M->S, but the message is still there.
+    // Validate: Controller should NOT send S->M to thirdMaster due to the pending M-S message
+    currentStateOutput.setCurrentState(_db, _partition, secondMaster, "SLAVE");
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, toSlaveMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
@@ -299,39 +320,41 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     messages = messageOutput.getMessages(_db, _partition);
     Assert.assertEquals(messages.size(), 0);
 
-
-    // Scenario 5:
-    // The thirdMaster completed the state transition and deleted the p2p message.
-    // Validate: Controller should M->S message to secondMaster.
-    currentStateOutput =
-        populateCurrentStateFromBestPossible(_bestpossibleState);
-    currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER");
-    currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE");
-
+    /// Scenario 6a:
+    // The second master has finished transition from M->S, and the message has been delete.
+    // There is an pending relay message
+    // Validate: Controller should NOT send S->M to thirdMaster
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, toSlaveMessage);currentStateOutput.setPendingRelayMessage(_db, _partition, thirdMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
 
     _messagePipeline.handle(event);
 
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(_db, _partition);
-    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.size(), 0);
 
-    toSlaveMessage = messages.get(0);
-    Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster);
-    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
-    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+    /// Scenario 6b:
+    // The second master has finished transition from M->S, and the message has been delete.
+    // There is no pending relay message
+    // Validate: Controller should send S->M to thirdMaster
+    currentStateOutput.getPendingMessageMap(_db, _partition).clear();
+    currentStateOutput.getPendingRelayMessageMap(_db, _partition).clear();
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    // verify p2p message are attached to the M->S message sent to the secondMaster
-    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
 
-    relayMessage = toSlaveMessage.getRelayMessage(thirdMaster);
-    Assert.assertNotNull(relayMessage);
-    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
-    Assert.assertEquals(relayMessage.getTgtName(), thirdMaster);
-    Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster);
-    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
-    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+    _messagePipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 1);
+    Message toMasterMessage = messages.get(0);
+    Assert.assertEquals(toMasterMessage.getTgtName(), thirdMaster);
+    Assert.assertEquals(toMasterMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(toMasterMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
   }
 
 
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index 5b9cace..2fda9bb 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -146,8 +146,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
     Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
-
-    // test the old master finish state transition, but has not forward p2p message yet.
+    // test the old master finishes state transition, but has not forwarded p2p message, and has
+    // not deleted original M-S message.
     currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
     currentStateOutput.setPendingMessage(db, p, masterInstance, toSlaveMessage);
     currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
@@ -156,36 +156,22 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
 
     pipeline.handle(event);
 
+    // We send out S->O message to old master although there is still pending message and pending
+    // p2p message
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(db, p);
-    Assert.assertEquals(messages.size(), 0);
-
-
-    currentStateOutput =
-        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
-    currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
-    currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
-    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-
-    pipeline.handle(event);
-
-    messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
-    messages = messageOutput.getMessages(db, p);
-    Assert.assertEquals(messages.size(), 1);
-
     Message toOfflineMessage = messages.get(0);
     Assert.assertEquals(toOfflineMessage.getTgtName(), masterInstance);
     Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
     Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
 
-
-    // Now, the old master finish state transition, but has not forward p2p message yet.
-    // Then the preference list has changed, so now the new master is different from previously calculated new master
-    // but controller should not send S->M to newly calculated master.
-    currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
-    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    // Now, the old master finishes state transition, and the second master also finishes state
+    // transition.
+    // Then the preference list has changed, so now the new master is different from previously
+    // calculated new master
+    // Controller should not send S->M to newly calculated master, but should send M-S to
+    // second master.
 
     String slaveInstance =
         getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
@@ -198,30 +184,12 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
 
-    pipeline = new Pipeline("test");
-    pipeline.addStage(new IntermediateStateCalcStage());
-    pipeline.addStage(new ResourceMessageGenerationPhase());
-    pipeline.addStage(new MessageSelectionStage());
-    pipeline.addStage(new MessageThrottleStage());
-
-    pipeline.handle(event);
-
-    messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
-    messages = messageOutput.getMessages(db, p);
-    Assert.assertEquals(messages.size(), 0);
-
-
-    // Now, the old master has forwarded the p2p master to previously calculated master,
-    // So the state-transition still happened in previously calculated master.
-    // Controller will not send S->M to new master.
-    currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
+    currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
-
-
     pipeline = new Pipeline("test");
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new ResourceMessageGenerationPhase());
@@ -233,19 +201,32 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(db, p);
-    Assert.assertEquals(messages.size(), 0);
+    Assert.assertEquals(messages.size(), 1);
+    toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
 
+    relayMessage = toSlaveMessage.getRelayMessage(slaveInstance);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), slaveInstance);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), newMasterInstance);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
-    // now, the previous calculated master completed the state transition and deleted the p2p message.
-    // Controller should drop this master first.
+    // Now, the old master and second master both finishes state transition, and messages are
+    // deleted, the forwarded relay message is pending.
+    // Controller will not send S->M to third master.
     currentStateOutput =
         populateCurrentStateFromBestPossible(bestPossibleStateOutput);
-    currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
-    currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
-
+    currentStateOutput.setPendingMessage(db, p, slaveInstance, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
     pipeline = new Pipeline("test");
+    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new ResourceMessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new MessageThrottleStage());
@@ -255,12 +236,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = messageOutput.getMessages(db, p);
-    Assert.assertEquals(messages.size(), 1);
-
-    toSlaveMessage = messages.get(0);
-    Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
-    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
-    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(messages.size(), 0);
   }
 
   private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled)