You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/06/24 18:24:48 UTC
[helix] branch master updated: Revert "Remove waiting on message
deletion if current state is already updated (#1067)"
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new d670af5 Revert "Remove waiting on message deletion if current state is already updated (#1067)"
d670af5 is described below
commit d670af56d1bbae5b89a3d5835c0fa216a4764a8f
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed Jun 24 11:24:33 2020 -0700
Revert "Remove waiting on message deletion if current state is already updated (#1067)"
This reverts commit d6d97c819970d5f6b76d9e3ade441ef8017fdb2f.
---
.../controller/stages/MessageGenerationPhase.java | 12 +-
.../messaging/TestP2PNoDuplicatedMessage.java | 10 +-
.../TestP2PMessagesAvoidDuplicatedMessage.java | 141 +++++++++------------
.../p2pMessage/TestP2PStateTransitionMessages.java | 92 +++++++++-----
4 files changed, 125 insertions(+), 130 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index d098b79..4223c37 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -220,17 +220,7 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
cancellationMessage, isCancellationEnabled);
- }
- // We will generate new message if pending message is null or current state equals
- // pending message's toState (no cancellation message should be generated in this case)
- if (pendingMessage == null || (message == null &&
- currentState.equalsIgnoreCase(pendingMessage.getToState()))) {
- if (pendingMessage != null) {
- logger.info("Ignore the pending message for resource %s partition %s instance "
- + "%s since the current state %s equals toState of pending message.",
- resource.getResourceName(), partition.getPartitionName(), instanceName,
- currentState);
- }
+ } else {
// Create new state transition message
message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
instanceName, currentState, nextState, sessionIdMap.get(instanceName),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index c0cea8f..a910f28 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -62,12 +62,12 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
final String CLASS_NAME = getShortClassName();
final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
- static final int PARTICIPANT_NUMBER = 10;
+ static final int PARTICIPANT_NUMBER = 6;
static final int PARTICIPANT_START_PORT = 12918;
static final int DB_COUNT = 2;
- static final int PARTITION_NUMBER = 100;
+ static final int PARTITION_NUMBER = 50;
static final int REPLICA_NUMBER = 3;
final String _controllerName = CONTROLLER_PREFIX + "_0";
@@ -171,7 +171,11 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
verifyP2PEnabled(startTime);
}
- Assert.assertEquals(p2pTrigged, total);
+ double ratio = ((double) p2pTrigged) / ((double) total);
+ Assert.assertTrue(ratio > 0.6, String
+ .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
+ p2pTrigged, total));
+
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
"There are duplicated transition messages sent while participant is handling the state-transition!");
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index c42bebe..3a1ea0e 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -152,10 +152,9 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
// Scenario 2A:
- // Old master (initialMaster) completes the M->S transition, but the M->S has not been deleted,
- // and it has not forward p2p message to new master (secondMaster) yet.
- // Validate: Controller should not send S->M message to new master, but should send S->O to
- // old master
+ // Old master (initialMaster) completes the M->S transition,
+ // but has not forward p2p message to new master (secondMaster) yet.
+ // Validate: Controller should not send S->M message to new master.
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
@@ -166,18 +165,13 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(_db, _partition);
- Assert.assertEquals(messages.size(), 1);
- Message toOfflineMessage = messages.get(0);
- Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster);
- Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
- Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
+ Assert.assertEquals(messages.size(), 0);
// Scenario 2B:
- // Old master (initialMaster) completes the M->S transition, and the message has been
- // deleted, but the message S->O is still there.
+ // Old master (initialMaster) completes the M->S transition,
// There is a pending p2p message to new master (secondMaster).
- // Validate: Controller should send S->M message to new master at same time
+ // Validate: Controller should send S->M message to new master at same time.
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
currentStateOutput.getPendingMessageMap(_db, _partition).clear();
@@ -207,8 +201,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
Assert.assertTrue(hasToOffline);
// Secenario 2C
- // Old master (initialMaster) completes the M->S transition, and the message has been
- // deleted, but the message S->O is still there.
+ // Old master (initialMaster) completes the M->S transition,
// There is a pending p2p message to new master (secondMaster).
// However, the new master has been changed in bestPossible
// Validate: Controller should not send S->M message to the third master at same time.
@@ -222,23 +215,24 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
instanceStateMap.put(thirdMaster, "MASTER");
_bestpossibleState.setState(_db, _partition, instanceStateMap);
- event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
- currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toOfflineMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
_messagePipeline.handle(event);
messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(_db, _partition);
- Assert.assertEquals(messages.size(), 0);
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE"));
+ Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster));
+
// Scenario 3:
- // Old master (initialMaster) completes the S->O transition,
+ // Old master (initialMaster) completes the M->S transition,
// and has already forwarded p2p message to new master (secondMaster)
- // The original S->O message sent to old master has been removed.
- // Validate: Controller should not send S->M message to new master.
+ // The original S->M message sent to old master has been removed.
+ // Validate: Controller should send S->O to old master, but not S->M message to new master.
instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
instanceStateMap.put(secondMaster, "MASTER");
instanceStateMap.put(thirdMaster, "SLAVE");
@@ -246,69 +240,54 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
currentStateOutput =
populateCurrentStateFromBestPossible(_bestpossibleState);
- currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
+ currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- _messagePipeline.handle(event);
+ _fullPipeline.handle(event);
messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(_db, _partition);
- Assert.assertEquals(messages.size(), 0);
+ Assert.assertEquals(messages.size(), 1);
+
+ Message toOfflineMessage = messages.get(0);
+ Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster);
+ Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
+
// Scenario 4:
- // The old master (initialMaster) finish state transition, and forward p2p message to
- // secondMaster. The secondMaster has finished state transition too, but the message has not
- // been deleted yet.
- // Then the preference list has changed, so now the new master (thirdMaster) is different
- // from previously calculated new master (secondMaster)
- // Validate: controller should not send S->M to thirdMaster, but should send M->S to
- // secondMaster.
+ // The old master (initialMaster) finish state transition, but has not forward p2p message yet.
+ // Then the preference list has changed, so now the new master (thirdMaster) is different from previously calculated new master (secondMaster)
+ // Validate: controller should not send S->M to thirdMaster.
+ currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
thirdMaster =
getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
MasterSlaveSMD.States.SLAVE.name());
+
instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
instanceStateMap.put(secondMaster, "SLAVE");
instanceStateMap.put(thirdMaster, "MASTER");
_bestpossibleState.setState(_db, _partition, instanceStateMap);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
- currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
- currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER");
- currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
- event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
_messagePipeline.handle(event);
messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(_db, _partition);
- Assert.assertEquals(messages.size(), 1);
- toSlaveMessage = messages.get(0);
- Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster);
- Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
- Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
-
- // verify p2p message are attached to the M->S message sent to the second master instance
- Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
- thirdMaster =
- getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(messages.size(), 0);
- relayMessage = toSlaveMessage.getRelayMessage(thirdMaster);
- Assert.assertNotNull(relayMessage);
- Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
- Assert.assertEquals(relayMessage.getTgtName(), thirdMaster);
- Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster);
- Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
- Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
// Scenario 5:
- // The second master has finished transition from M->S, but the message is still there.
- // Validate: Controller should NOT send S->M to thirdMaster due to the pending M-S message
- currentStateOutput.setCurrentState(_db, _partition, secondMaster, "SLAVE");
- currentStateOutput.setPendingMessage(_db, _partition, secondMaster, toSlaveMessage);
+ // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster,
+ // But the S->M state-transition has not completed yet in secondMaster.
+ // Validate: Controller should not send S->M to thirdMaster.
+ currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
@@ -320,41 +299,39 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
messages = messageOutput.getMessages(_db, _partition);
Assert.assertEquals(messages.size(), 0);
- /// Scenario 6a:
- // The second master has finished transition from M->S, and the message has been delete.
- // There is an pending relay message
- // Validate: Controller should NOT send S->M to thirdMaster
- currentStateOutput.setPendingMessage(_db, _partition, secondMaster, toSlaveMessage);currentStateOutput.setPendingRelayMessage(_db, _partition, thirdMaster, relayMessage);
- event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
- _messagePipeline.handle(event);
-
- messageOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
- messages = messageOutput.getMessages(_db, _partition);
- Assert.assertEquals(messages.size(), 0);
+ // Scenario 5:
+ // The thirdMaster completed the state transition and deleted the p2p message.
+ // Validate: Controller should M->S message to secondMaster.
+ currentStateOutput =
+ populateCurrentStateFromBestPossible(_bestpossibleState);
+ currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER");
+ currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE");
- /// Scenario 6b:
- // The second master has finished transition from M->S, and the message has been delete.
- // There is no pending relay message
- // Validate: Controller should send S->M to thirdMaster
- currentStateOutput.getPendingMessageMap(_db, _partition).clear();
- currentStateOutput.getPendingRelayMessageMap(_db, _partition).clear();
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
-
_messagePipeline.handle(event);
messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(_db, _partition);
Assert.assertEquals(messages.size(), 1);
- Message toMasterMessage = messages.get(0);
- Assert.assertEquals(toMasterMessage.getTgtName(), thirdMaster);
- Assert.assertEquals(toMasterMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
- Assert.assertEquals(toMasterMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+
+ toSlaveMessage = messages.get(0);
+ Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster);
+ Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+ // verify p2p message are attached to the M->S message sent to the secondMaster
+ Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+
+ relayMessage = toSlaveMessage.getRelayMessage(thirdMaster);
+ Assert.assertNotNull(relayMessage);
+ Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+ Assert.assertEquals(relayMessage.getTgtName(), thirdMaster);
+ Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster);
+ Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
}
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index 2fda9bb..5b9cace 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -146,8 +146,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
- // test the old master finishes state transition, but has not forwarded p2p message, and has
- // not deleted original M-S message.
+
+ // test the old master finish state transition, but has not forward p2p message yet.
currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
currentStateOutput.setPendingMessage(db, p, masterInstance, toSlaveMessage);
currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
@@ -156,22 +156,36 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
pipeline.handle(event);
- // We send out S->O message to old master although there is still pending message and pending
- // p2p message
messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 0);
+
+
+ currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+ currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 1);
+
Message toOfflineMessage = messages.get(0);
Assert.assertEquals(toOfflineMessage.getTgtName(), masterInstance);
Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
- // Now, the old master finishes state transition, and the second master also finishes state
- // transition.
- // Then the preference list has changed, so now the new master is different from previously
- // calculated new master
- // Controller should not send S->M to newly calculated master, but should send M-S to
- // second master.
+
+ // Now, the old master finish state transition, but has not forward p2p message yet.
+ // Then the preference list has changed, so now the new master is different from previously calculated new master
+ // but controller should not send S->M to newly calculated master.
+ currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
String slaveInstance =
getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
@@ -184,12 +198,6 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
- currentStateOutput =
- populateCurrentStateFromBestPossible(bestPossibleStateOutput);
- currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
- currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
- event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-
pipeline = new Pipeline("test");
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new ResourceMessageGenerationPhase());
@@ -201,29 +209,18 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(db, p);
- Assert.assertEquals(messages.size(), 1);
- toSlaveMessage = messages.get(0);
- Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
- Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
- Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(messages.size(), 0);
- relayMessage = toSlaveMessage.getRelayMessage(slaveInstance);
- Assert.assertNotNull(relayMessage);
- Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
- Assert.assertEquals(relayMessage.getTgtName(), slaveInstance);
- Assert.assertEquals(relayMessage.getRelaySrcHost(), newMasterInstance);
- Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
- Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
- // Now, the old master and second master both finishes state transition, and messages are
- // deleted, the forwarded relay message is pending.
- // Controller will not send S->M to third master.
- currentStateOutput =
- populateCurrentStateFromBestPossible(bestPossibleStateOutput);
- currentStateOutput.setPendingMessage(db, p, slaveInstance, relayMessage);
+ // Now, the old master has forwarded the p2p master to previously calculated master,
+ // So the state-transition still happened in previously calculated master.
+ // Controller will not send S->M to new master.
+ currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
+
pipeline = new Pipeline("test");
pipeline.addStage(new IntermediateStateCalcStage());
@@ -237,6 +234,33 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = messageOutput.getMessages(db, p);
Assert.assertEquals(messages.size(), 0);
+
+
+ // now, the previous calculated master completed the state transition and deleted the p2p message.
+ // Controller should drop this master first.
+ currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
+ currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ pipeline = new Pipeline("test");
+ pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new MessageThrottleStage());
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 1);
+
+ toSlaveMessage = messages.get(0);
+ Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
+ Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
}
private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled)