You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:20:05 UTC

[helix] 31/44: Add tests for cancellation message with p2p

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 350c7336f6bce6235603b47837ec0dcd2d88418e
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Thu May 9 15:25:40 2019 -0700

    Add tests for cancellation message with p2p
    
    Adding a test case to ensure cancellation message will not cancel the message of p2p relay message when it is under pending state.
    
    RB=1661028
    G=helix-reviewers
    A=lxia
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../TestP2PWithStateCancellationMessage.java       | 181 +++++++++++++++++++++
 1 file changed, 181 insertions(+)

diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
new file mode 100644
index 0000000..aa7f780
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -0,0 +1,181 @@
+package org.apache.helix.messaging.p2pMessage;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageOutput;
+import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestP2PWithStateCancellationMessage extends BaseStageTest {
+  private final static String CLUSTER_NAME = "MockCluster";
+  private final static String RESOURCE_NAME = "MockResource";
+
+  @Test
+  public void testP2PWithStateCancellationMessage() {
+    ClusterEvent event = generateClusterEvent();
+    runStage(event, new ResourceMessageGenerationPhase());
+    MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    // No message should be sent for partition 0
+    Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new Partition("0")).size(), 0);
+
+    // One cancellation message should be sent out for partition 1
+    List<Message> messages = messageOutput.getMessages(RESOURCE_NAME, new Partition("1"));
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getMsgType(),
+        Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
+  }
+
+  private ClusterEvent generateClusterEvent() {
+    Mock mock = new Mock();
+    ClusterEvent event =
+        new ClusterEvent(CLUSTER_NAME, ClusterEventType.IdealStateChange, "randomId");
+    ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+
+    // mock manager
+    event.addAttribute(AttributeName.helixmanager.name(), mock.manager);
+    when(mock.manager.getHelixDataAccessor()).thenReturn(mock.accessor);
+    when(mock.manager.getSessionId()).thenReturn(UUID.randomUUID().toString());
+    when(mock.manager.getInstanceName()).thenReturn("CONTROLLER");
+
+    // mock resource
+    ResourceConfig resourceConfig = new ResourceConfig(RESOURCE_NAME);
+    Resource resource = new Resource(RESOURCE_NAME, clusterConfig, resourceConfig);
+    resource.addPartition("0");
+    resource.addPartition("1");
+    resource.setStateModelDefRef("MasterSlave");
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        Collections.singletonMap(RESOURCE_NAME, resource));
+
+    // mock cache with two live instances and session id.
+    LiveInstance l1 = new LiveInstance("localhost_1");
+    l1.setSessionId(UUID.randomUUID().toString());
+    LiveInstance l2 = new LiveInstance("localhost_2");
+    l2.setSessionId(UUID.randomUUID().toString());
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), mock.cache);
+    when(mock.cache.getStateModelDef("MasterSlave")).thenReturn(MasterSlaveSMD.build());
+    when(mock.cache.getClusterConfig()).thenReturn(clusterConfig);
+    when(mock.cache.getLiveInstances()).thenReturn(Arrays.asList(l1, l2).stream().collect(
+        Collectors.toMap(LiveInstance::getId, Function.identity())));
+
+    // mock current state output. Generate 3 messages:
+    // 1. main message staying ZK contains #2 p2p message.
+    // 2. p2p message that should be hide in #1 message
+    // 3. message should be cancelled since target state changed.
+    Message message =
+        new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+    message.setSrcName(manager.getInstanceName());
+    message.setTgtName("localhost_1");
+    message.setMsgState(Message.MessageState.NEW);
+    message.setPartitionName("0");
+    message.setResourceName(resource.getResourceName());
+    message.setFromState("MASTER");
+    message.setToState("SLAVE");
+    message.setTgtSessionId(UUID.randomUUID().toString());
+    message.setSrcSessionId(manager.getSessionId());
+    message.setStateModelDef("MasterSlave");
+    message.setTgtSessionId(UUID.randomUUID().toString());
+
+    Message relayMessage =
+        new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+    relayMessage.setSrcName("localhost_1");
+    relayMessage.setTgtName("localhost_2");
+    relayMessage.setMsgState(Message.MessageState.NEW);
+    relayMessage.setPartitionName("0");
+    relayMessage.setResourceName(resource.getResourceName());
+    relayMessage.setFromState("SLAVE");
+    relayMessage.setToState("MASTER");
+    relayMessage.setTgtSessionId(UUID.randomUUID().toString());
+    relayMessage.setSrcSessionId(manager.getSessionId());
+    relayMessage.setStateModelDef("MasterSlave");
+    relayMessage.setTgtSessionId(UUID.randomUUID().toString());
+
+    Message messageToBeCancelled =
+        new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+
+    messageToBeCancelled.setSrcName(manager.getInstanceName());
+    messageToBeCancelled.setTgtName("localhost_2");
+    messageToBeCancelled.setMsgState(Message.MessageState.NEW);
+    messageToBeCancelled.setPartitionName("1");
+    messageToBeCancelled.setResourceName(resource.getResourceName());
+    messageToBeCancelled.setFromState("MASTER");
+    messageToBeCancelled.setToState("SLAVE");
+    messageToBeCancelled.setTgtSessionId(UUID.randomUUID().toString());
+    messageToBeCancelled.setSrcSessionId(manager.getSessionId());
+    messageToBeCancelled.setStateModelDef("MasterSlave");
+    messageToBeCancelled.setTgtSessionId(UUID.randomUUID().toString());
+
+    // mock current state & intermediate state output
+    // Keep partition 0 same target state to make sure p2p message not be cancelled.
+    // Make partition 1 target state change so Helix should send cancellation message.
+
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setPendingMessage(RESOURCE_NAME, new Partition("0"), "localhost_1", message);
+    currentStateOutput.setPendingMessage(RESOURCE_NAME, new Partition("0"), "localhost_2", relayMessage);
+    currentStateOutput
+        .setPendingMessage(RESOURCE_NAME, new Partition("1"), "localhost_2", messageToBeCancelled);
+    currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("0"), "localhost_1", "MASTER");
+    currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("0"), "localhost_2", "SLAVE");
+    currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER");
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    IntermediateStateOutput intermediateStateOutput = new IntermediateStateOutput();
+    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_1", "SLAVE");
+    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_2", "MASTER");
+    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER");
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput);
+
+    return event;
+  }
+
+  private final class Mock {
+    private ResourceControllerDataProvider cache = mock(ResourceControllerDataProvider.class);
+    private HelixManager manager = mock(ZKHelixManager.class);
+    private HelixDataAccessor accessor = mock(ZKHelixDataAccessor.class);
+  }
+}