You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/24 23:49:27 UTC
[helix] 05/15: Remove workaround in sending S->M message when there
is a same pending relay message.
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 831593c414e56695725da6ad99a0bd2128157be4
Author: Lei Xia <lx...@linkedin.com>
AuthorDate: Fri May 17 10:36:22 2019 -0700
Remove workaround in sending S->M message when there is a same pending relay message.
RB=1670732
BUG=HELIX-1871
G=helix-reviewers
A=jjwang,jxue
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../controller/stages/MessageSelectionStage.java | 16 ++---
.../integration/manager/TestZkHelixAdmin.java | 17 +++--
.../paticipant/TestStateTransitionTimeout.java | 20 ++++--
.../TestStateTransitionTimeoutWithResource.java | 23 ++++---
.../TestP2PMessagesAvoidDuplicatedMessage.java | 72 ++++++++++++++++++++--
5 files changed, 121 insertions(+), 27 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 73c8901..9fd66d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -184,7 +184,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
}
if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
- messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+ messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
}
messagesGroupByStateTransitPriority.get(priority).add(message);
@@ -195,25 +195,25 @@ public class MessageSelectionStage extends AbstractBaseStage {
// select messages
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+ NextMessage:
for (Message message : messageList) {
String toState = message.getToState();
String fromState = message.getFromState();
if (toState.equals(stateModelDef.getTopState())) {
// find if there are any pending relay messages match this message.
- // if yes, rebuild the message to use the same message id from the original relay message.
+ // if the pending relay message targets the same host, we are fine to continue send the message,
+ // if it targets to different host, we should not send the message now (should send after the relay message gets expired).
for (Message relayMsg : pendingRelayMessages) {
if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
.equals(fromState)) {
- if (relayMsg.getTgtName().equals(message.getTgtName())) {
- message = new Message(message, relayMsg.getMsgId());
- } else {
- // if there are pending relay message that was sent to a different host than the current message
- // we should not send the toState message now.
+ LOG.info(
+ "There is pending relay message, pending relay message: {}", relayMsg);
+ if (!relayMsg.getTgtName().equals(message.getTgtName())) {
LOG.info(
"There is pending relay message to a different host, not send message: {}, pending relay message: {}",
message, relayMsg);
- continue;
+ continue NextMessage;
}
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 5141a8d..49d01a8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.manager;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
@@ -73,6 +74,8 @@ public class TestZkHelixAdmin extends TaskTestBase {
idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
_admin.setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder builder = new Workflow.Builder(workflowName);
JobConfig.Builder jobBuilder =
@@ -84,8 +87,14 @@ public class TestZkHelixAdmin extends TaskTestBase {
Thread.sleep(2000L);
JobContext jobContext =
_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
- Assert.assertEquals(jobContext.getPartitionState(0), null);
- Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED);
- Assert.assertEquals(jobContext.getPartitionState(2), null);
+ int n = idealState.getNumPartitions();
+ for ( int i = 0; i < n; i++) {
+ String targetPartition = jobContext.getTargetForPartition(i);
+ if (targetPartition.equals("TestDB_0") || targetPartition.equals("TestDB_2")) {
+ Assert.assertEquals(jobContext.getPartitionState(i), null);
+ } else {
+ Assert.assertEquals(jobContext.getPartitionState(i), TaskPartitionState.COMPLETED);
+ }
+ }
}
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 74cf9a2..59aa61e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -164,15 +165,26 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
Assert.assertTrue(result);
HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
+ TestHelper.verify(() -> verify(accessor, idealState, factories), 5000);
+ Assert.assertTrue(verify(accessor, idealState, factories));
+ }
+
+ private boolean verify(HelixDataAccessor accessor, IdealState idealState,
+ Map<String, SleepStateModelFactory> factoryMap) {
Builder kb = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
for (String p : idealState.getPartitionSet()) {
String idealMaster = idealState.getPreferenceList(p).get(0);
- Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+ if (!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+ return false;
+ }
- TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
- Assert.assertEquals(model._errorCallcount, 1);
- Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+ TimeOutStateModel model = factoryMap.get(idealMaster).getStateModel(TEST_DB, p);
+ if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+ return false;
+ }
}
+
+ return true;
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index 7cb3e75..c3bcf47 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
@@ -168,11 +169,12 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
.verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- verify(TEST_DB);
+ TestHelper.verify(() -> verify(TEST_DB), 5000);
+ Assert.assertTrue(verify(TEST_DB));
}
@Test
- public void testStateTransitionTimeoutByClusterLevel() throws InterruptedException {
+ public void testStateTransitionTimeoutByClusterLevel() throws Exception {
_gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB + 1, _PARTITIONS, STATE_MODEL);
_gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB + 1, false);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB + 1, 3);
@@ -191,22 +193,29 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
ClusterStateVerifier
.verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- verify(TEST_DB + 1);
+
+ TestHelper.verify(() -> verify(TEST_DB + 1), 5000);
+ Assert.assertTrue(verify(TEST_DB + 1));
}
- private void verify(String dbName) {
+ private boolean verify(String dbName) {
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(dbName));
for (String p : idealState.getPartitionSet()) {
String idealMaster = idealState.getPreferenceList(p).get(0);
- Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+ if(!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+ return false;
+ }
TimeOutStateModel model = _factories.get(idealMaster).getStateModel(dbName, p);
- Assert.assertEquals(model._errorCallcount, 1);
- Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+ if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+ return false;
+ }
}
+
+ return true;
}
private void setParticipants(String dbName) throws InterruptedException {
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 3980b36..5d8be22 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -150,11 +150,10 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
- // Scenario 2:
+ // Scenario 2A:
// Old master (initialMaster) completes the M->S transition,
// but has not forward p2p message to new master (secondMaster) yet.
// Validate: Controller should not send S->M message to new master.
-
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
@@ -168,11 +167,76 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
Assert.assertEquals(messages.size(), 0);
+ // Scenario 2B:
+ // Old master (initialMaster) completes the M->S transition,
+ // There is a pending p2p message to new master (secondMaster).
+ // Validate: Controller should send S->M message to new master at same time.
+
+ currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+ currentStateOutput.getPendingMessageMap(_db, _partition).clear();
+ currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ _messagePipeline.handle(event);
+
+ messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(_db, _partition);
+ Assert.assertEquals(messages.size(), 2);
+
+ boolean hasToOffline = false;
+ boolean hasToMaster = false;
+ for (Message msg : messages) {
+ if (msg.getToState().equals(MasterSlaveSMD.States.MASTER.name()) && msg.getTgtName()
+ .equals(secondMaster)) {
+ hasToMaster = true;
+ }
+ if (msg.getToState().equals(MasterSlaveSMD.States.OFFLINE.name()) && msg.getTgtName()
+ .equals(initialMaster)) {
+ hasToOffline = true;
+ }
+ }
+ Assert.assertTrue(hasToMaster);
+ Assert.assertTrue(hasToOffline);
+
+ // Secenario 2C
+ // Old master (initialMaster) completes the M->S transition,
+ // There is a pending p2p message to new master (secondMaster).
+ // However, the new master has been changed in bestPossible
+ // Validate: Controller should not send S->M message to the third master at same time.
+
+ String thirdMaster =
+ getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
+ MasterSlaveSMD.States.SLAVE.name());
+
+ Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+ instanceStateMap.put(secondMaster, "SLAVE");
+ instanceStateMap.put(thirdMaster, "MASTER");
+ _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
+
+ _messagePipeline.handle(event);
+
+ messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(_db, _partition);
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE"));
+ Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster));
+
+
// Scenario 3:
// Old master (initialMaster) completes the M->S transition,
// and has already forwarded p2p message to new master (secondMaster)
// The original S->M message sent to old master has been removed.
// Validate: Controller should send S->O to old master, but not S->M message to new master.
+ instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+ instanceStateMap.put(secondMaster, "MASTER");
+ instanceStateMap.put(thirdMaster, "SLAVE");
+ _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
currentStateOutput =
populateCurrentStateFromBestPossible(_bestpossibleState);
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
@@ -199,11 +263,11 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- String thirdMaster =
+ thirdMaster =
getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
MasterSlaveSMD.States.SLAVE.name());
- Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+ instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
instanceStateMap.put(secondMaster, "SLAVE");
instanceStateMap.put(thirdMaster, "MASTER");
_bestpossibleState.setState(_db, _partition, instanceStateMap);