You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/24 23:49:27 UTC

[helix] 05/15: Remove workaround in sending S->M message when there is a same pending relay message.

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 831593c414e56695725da6ad99a0bd2128157be4
Author: Lei Xia <lx...@linkedin.com>
AuthorDate: Fri May 17 10:36:22 2019 -0700

    Remove workaround in sending S->M message when there is a same pending relay message.
    
    RB=1670732
    BUG=HELIX-1871
    G=helix-reviewers
    A=jjwang,jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../controller/stages/MessageSelectionStage.java   | 16 ++---
 .../integration/manager/TestZkHelixAdmin.java      | 17 +++--
 .../paticipant/TestStateTransitionTimeout.java     | 20 ++++--
 .../TestStateTransitionTimeoutWithResource.java    | 23 ++++---
 .../TestP2PMessagesAvoidDuplicatedMessage.java     | 72 ++++++++++++++++++++--
 5 files changed, 121 insertions(+), 27 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 73c8901..9fd66d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -184,7 +184,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
       }
 
       if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
-        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+        messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
       }
       messagesGroupByStateTransitPriority.get(priority).add(message);
 
@@ -195,25 +195,25 @@ public class MessageSelectionStage extends AbstractBaseStage {
 
     // select messages
     for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+      NextMessage:
       for (Message message : messageList) {
         String toState = message.getToState();
         String fromState = message.getFromState();
 
         if (toState.equals(stateModelDef.getTopState())) {
           // find if there are any pending relay messages match this message.
-          // if yes, rebuild the message to use the same message id from the original relay message.
+          // if the pending relay message targets the same host, we are fine to continue send the message,
+          // if it targets to different host, we should not send the message now (should send after the relay message gets expired).
           for (Message relayMsg : pendingRelayMessages) {
             if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
                 .equals(fromState)) {
-              if (relayMsg.getTgtName().equals(message.getTgtName())) {
-                message = new Message(message, relayMsg.getMsgId());
-              } else {
-                // if there are pending relay message that was sent to a different host than the current message
-                // we should not send the toState message now.
+              LOG.info(
+                  "There is pending relay message, pending relay message: {}", relayMsg);
+              if (!relayMsg.getTgtName().equals(message.getTgtName())) {
                 LOG.info(
                     "There is pending relay message to a different host, not send message: {}, pending relay message: {}",
                     message, relayMsg);
-                continue;
+                continue NextMessage;
               }
             }
           }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 5141a8d..49d01a8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.manager;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
@@ -73,6 +74,8 @@ public class TestZkHelixAdmin extends TaskTestBase {
     idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
     _admin.setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
 
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder builder = new Workflow.Builder(workflowName);
     JobConfig.Builder jobBuilder =
@@ -84,8 +87,14 @@ public class TestZkHelixAdmin extends TaskTestBase {
     Thread.sleep(2000L);
     JobContext jobContext =
         _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
-    Assert.assertEquals(jobContext.getPartitionState(0), null);
-    Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED);
-    Assert.assertEquals(jobContext.getPartitionState(2), null);
+    int n = idealState.getNumPartitions();
+    for ( int i = 0; i < n; i++) {
+      String targetPartition = jobContext.getTargetForPartition(i);
+      if (targetPartition.equals("TestDB_0") || targetPartition.equals("TestDB_2")) {
+        Assert.assertEquals(jobContext.getPartitionState(i), null);
+      } else {
+        Assert.assertEquals(jobContext.getPartitionState(i), TaskPartitionState.COMPLETED);
+      }
+    }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 74cf9a2..59aa61e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -164,15 +165,26 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
     Assert.assertTrue(result);
     HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 
+    TestHelper.verify(() -> verify(accessor, idealState, factories), 5000);
+    Assert.assertTrue(verify(accessor, idealState, factories));
+  }
+
+  private boolean verify(HelixDataAccessor accessor, IdealState idealState,
+      Map<String, SleepStateModelFactory> factoryMap) {
     Builder kb = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
     for (String p : idealState.getPartitionSet()) {
       String idealMaster = idealState.getPreferenceList(p).get(0);
-      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+      if (!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+        return false;
+      }
 
-      TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
-      Assert.assertEquals(model._errorCallcount, 1);
-      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+      TimeOutStateModel model = factoryMap.get(idealMaster).getStateModel(TEST_DB, p);
+      if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+        return false;
+      }
     }
+
+    return true;
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index 7cb3e75..c3bcf47 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
@@ -168,11 +169,12 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
             .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
-    verify(TEST_DB);
+    TestHelper.verify(() -> verify(TEST_DB), 5000);
+    Assert.assertTrue(verify(TEST_DB));
   }
 
   @Test
-  public void testStateTransitionTimeoutByClusterLevel() throws InterruptedException {
+  public void testStateTransitionTimeoutByClusterLevel() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB + 1, _PARTITIONS, STATE_MODEL);
     _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB + 1, false);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB  + 1, 3);
@@ -191,22 +193,29 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
         ClusterStateVerifier
             .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    verify(TEST_DB + 1);
+
+    TestHelper.verify(() -> verify(TEST_DB + 1), 5000);
+    Assert.assertTrue(verify(TEST_DB + 1));
   }
 
-  private void verify(String dbName) {
+  private boolean verify(String dbName) {
     IdealState idealState =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(dbName));
     for (String p : idealState.getPartitionSet()) {
       String idealMaster = idealState.getPreferenceList(p).get(0);
-      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+      if(!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+        return false;
+      }
 
       TimeOutStateModel model = _factories.get(idealMaster).getStateModel(dbName, p);
-      Assert.assertEquals(model._errorCallcount, 1);
-      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+      if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+        return false;
+      }
     }
+
+    return true;
   }
 
   private void setParticipants(String dbName) throws InterruptedException {
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 3980b36..5d8be22 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -150,11 +150,10 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
     Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
-    // Scenario 2:
+    // Scenario 2A:
     // Old master (initialMaster) completes the M->S transition,
     // but has not forward p2p message to new master (secondMaster) yet.
     // Validate: Controller should not send S->M message to new master.
-
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
     currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
     currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
@@ -168,11 +167,76 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertEquals(messages.size(), 0);
 
 
+    // Scenario 2B:
+    // Old master (initialMaster) completes the M->S transition,
+    // There is a pending p2p message to new master (secondMaster).
+    // Validate: Controller should send S->M message to new master at same time.
+
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+    currentStateOutput.getPendingMessageMap(_db, _partition).clear();
+    currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _messagePipeline.handle(event);
+
+    messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 2);
+
+    boolean hasToOffline = false;
+    boolean hasToMaster = false;
+    for (Message msg : messages) {
+      if (msg.getToState().equals(MasterSlaveSMD.States.MASTER.name()) && msg.getTgtName()
+          .equals(secondMaster)) {
+        hasToMaster = true;
+      }
+      if (msg.getToState().equals(MasterSlaveSMD.States.OFFLINE.name()) && msg.getTgtName()
+          .equals(initialMaster)) {
+        hasToOffline = true;
+      }
+    }
+    Assert.assertTrue(hasToMaster);
+    Assert.assertTrue(hasToOffline);
+
+    // Secenario 2C
+    // Old master (initialMaster) completes the M->S transition,
+    // There is a pending p2p message to new master (secondMaster).
+    // However, the new master has been changed in bestPossible
+    // Validate: Controller should not send S->M message to the third master at same time.
+
+    String thirdMaster =
+        getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.SLAVE.name());
+
+    Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap.put(secondMaster, "SLAVE");
+    instanceStateMap.put(thirdMaster, "MASTER");
+    _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
+
+    _messagePipeline.handle(event);
+
+    messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE"));
+    Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster));
+
+
     // Scenario 3:
     // Old master (initialMaster) completes the M->S transition,
     // and has already forwarded p2p message to new master (secondMaster)
     // The original S->M message sent to old master has been removed.
     // Validate: Controller should send S->O to old master, but not S->M message to new master.
+    instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap.put(secondMaster, "MASTER");
+    instanceStateMap.put(thirdMaster, "SLAVE");
+    _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
     currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
@@ -199,11 +263,11 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    String thirdMaster =
+    thirdMaster =
         getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
             MasterSlaveSMD.States.SLAVE.name());
 
-    Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
     instanceStateMap.put(secondMaster, "SLAVE");
     instanceStateMap.put(thirdMaster, "MASTER");
     _bestpossibleState.setState(_db, _partition, instanceStateMap);