You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/03/09 19:04:21 UTC

[helix] branch master updated: Generate cancellation message for currentState=null desiredState=DROPPED (#831)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf78cd  Generate cancellation message for currentState=null desiredState=DROPPED (#831)
fcf78cd is described below

commit fcf78cdb1b114a5424ecb179536d9c2931e9bba1
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 9 12:04:13 2020 -0700

    Generate cancellation message for currentState=null desiredState=DROPPED (#831)
    
    * Generate cancellation message for currentState=null desiredState=DROPPED
    
    Generate cancellation message for currentState=null desiredState=DROPPED
---
 .../controller/stages/MessageGenerationPhase.java  | 145 +++++++++++++--------
 .../stages/TestCancellationMessageGeneration.java  |  97 ++++++++++++++
 2 files changed, 186 insertions(+), 56 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 99bcdb5..4223c37 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -151,14 +151,32 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
 
         String currentState =
             currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+        Message pendingMessage =
+            currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
+        boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
+        Message cancellationMessage =
+            currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
+        String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+
+        Message message = null;
+
         if (currentState == null) {
           currentState = stateModelDef.getInitialState();
+          nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+
           if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
             LogUtil.logDebug(logger, _eventId,
                 String.format(
                     "No current state for partition %s in resource %s, skip the drop message",
                     partition.getPartitionName(), resourceName));
 
+            message =
+                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
+                    manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
+                    cancellationMessage, isCancellationEnabled);
+            addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState,
+                resourceName, partition, currentState, nextState);
+
             // TODO: separate logic of resource/task message generation
             if (cache instanceof ResourceControllerDataProvider) {
               ((ResourceControllerDataProvider) cache)
@@ -168,15 +186,6 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
           }
         }
 
-        Message pendingMessage =
-            currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
-        boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
-        Message cancellationMessage =
-            currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
-        String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-
-        Message message = null;
-
         if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState,
             currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
           LogUtil.logInfo(logger, _eventId, String.format(
@@ -207,29 +216,10 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
           }
 
           if (pendingMessage != null) {
-            String pendingState = pendingMessage.getToState();
-            if (nextState.equalsIgnoreCase(pendingState)) {
-              LogUtil.logInfo(logger, _eventId,
-                  "Message already exists for " + instanceName + " to transit " + resource
-                      .getResourceName() + "." + partition.getPartitionName() + " from "
-                      + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
-            } else if (currentState.equalsIgnoreCase(pendingState)) {
-              LogUtil.logInfo(logger, _eventId,
-                  "Message hasn't been removed for " + instanceName + " to transit " + resource
-                      .getResourceName() + "." + partition.getPartitionName() + " to "
-                      + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage());
-            } else {
-              LogUtil.logInfo(logger, _eventId,
-                  "IdealState changed before state transition completes for " + resource
-                      .getResourceName() + "." + partition.getPartitionName() + " on "
-                      + instanceName + ", pendingState: " + pendingState + ", currentState: "
-                      + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
-
-              message = createStateTransitionCancellationMessage(manager, resource,
-                  partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
-                  stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
-                  cancellationMessage, isCancellationEnabled, currentState);
-            }
+            message =
+                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
+                    manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
+                    cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
             message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
@@ -244,30 +234,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
             }
           }
         }
-
-        if (message != null) {
-          IdealState idealState = cache.getIdealState(resourceName);
-          if (idealState != null && idealState.getStateModelDefRef()
-              .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-              message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                  idealState.getRecord().getMapField(partition.getPartitionName()));
-            }
-          }
-
-          int timeout = getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName),
-              currentState, nextState, idealState, partition);
-          if (timeout > 0) {
-            message.setExecutionTimeout(timeout);
-          }
-
-          message.setAttribute(Message.Attributes.ClusterEventName, eventType.name());
-          // output.addMessage(resourceName, partition, message);
-          if (!messageMap.containsKey(desiredState)) {
-            messageMap.put(desiredState, new ArrayList<Message>());
-          }
-          messageMap.get(desiredState).add(message);
-        }
+        addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState, resourceName,
+            partition, currentState, nextState);
       }
 
       // add generated messages to output according to state priority
@@ -291,6 +259,71 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
     } // end of for-each-partition
   }
 
+  private Message generateCancellationMessageForPendingMessage(final String desiredState, final String currentState,
+      final String nextState, final Message pendingMessage, final HelixManager manager,
+      final Resource resource, final Partition partition, final Map<String, String> sessionIdMap,
+      final String instanceName, final StateModelDefinition stateModelDef,
+      final Message cancellationMessage, final boolean isCancellationEnabled) {
+
+    Message message = null;
+
+    if (pendingMessage != null) {
+      String pendingState = pendingMessage.getToState();
+      if (nextState.equalsIgnoreCase(pendingState)) {
+        LogUtil.logInfo(logger, _eventId,
+            "Message already exists for " + instanceName + " to transit " + resource
+                .getResourceName() + "." + partition.getPartitionName() + " from "
+                + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+      } else if (currentState.equalsIgnoreCase(pendingState)) {
+        LogUtil.logInfo(logger, _eventId,
+            "Message hasn't been removed for " + instanceName + " to transit " + resource
+                .getResourceName() + "." + partition.getPartitionName() + " to "
+                + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage());
+      } else {
+        LogUtil.logInfo(logger, _eventId,
+            "IdealState changed before state transition completes for " + resource
+                .getResourceName() + "." + partition.getPartitionName() + " on "
+                + instanceName + ", pendingState: " + pendingState + ", currentState: "
+                + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+
+        message = createStateTransitionCancellationMessage(manager, resource,
+            partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
+            stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
+            cancellationMessage, isCancellationEnabled, currentState);
+      }
+    }
+    return message;
+  }
+
+  private void addGeneratedMessageToMap(final Message message,
+      Map<String, List<Message>> messageMap, final ClusterEventType eventType,
+      final BaseControllerDataProvider cache, final String desiredState, final String resourceName,
+      final Partition partition, final String currentState, final String nextState) {
+    if (message != null) {
+      IdealState idealState = cache.getIdealState(resourceName);
+      if (idealState != null && idealState.getStateModelDefRef()
+          .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+        if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+          message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+              idealState.getRecord().getMapField(partition.getPartitionName()));
+        }
+      }
+
+      int timeout = getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName),
+          currentState, nextState, idealState, partition);
+      if (timeout > 0) {
+        message.setExecutionTimeout(timeout);
+      }
+
+      message.setAttribute(Message.Attributes.ClusterEventName, eventType.name());
+      // output.addMessage(resourceName, partition, message);
+      if (!messageMap.containsKey(desiredState)) {
+        messageMap.put(desiredState, new ArrayList<Message>());
+      }
+      messageMap.get(desiredState).add(message);
+    }
+  }
+
   /**
    * Start a job in worker pool that asynchronously clean up pending message. Since it is possible
    * that participant failed to clean up message after processing, it is important for controller to
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
new file mode 100644
index 0000000..6d7c4ae
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -0,0 +1,97 @@
+package org.apache.helix.controller.stages;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * This test checks the cancellation message generation when currentState=null and desiredState=DROPPED
+ */
+public class TestCancellationMessageGeneration extends MessageGenerationPhase {
+  private static final String TEST_CLUSTER = "testCluster";
+  private static final String TEST_RESOURCE = "resource0";
+  private static final String TEST_INSTANCE = "instance0";
+  private static final String TEST_PARTITION = "partition0";
+
+  @Test
+  public void TestOFFLINEToDROPPED() throws Exception {
+
+    ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown);
+
+
+    // Set current state to event
+    CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
+    Partition partition = mock(Partition.class);
+    when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(null);
+    Message message = mock(Message.class);
+    when(message.getFromState()).thenReturn("OFFLINE");
+    when(message.getToState()).thenReturn("SLAVE");
+    when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(message);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    // Set helix manager to event
+    event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
+
+    // Set controller data provider to event
+    BaseControllerDataProvider cache = mock(BaseControllerDataProvider.class);
+    StateModelDefinition stateModelDefinition = new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
+    Map<String, LiveInstance> liveInstances= mock(Map.class);
+    LiveInstance mockLiveInstance = mock(LiveInstance.class);
+    when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
+    when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
+    when(liveInstances.values()).thenReturn(Arrays.asList(mockLiveInstance));
+    when(cache.getLiveInstances()).thenReturn(liveInstances);
+    ClusterConfig clusterConfig = mock(ClusterConfig.class);
+    when(cache.getClusterConfig()).thenReturn(clusterConfig);
+    when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(true);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+
+    // Set resources to rebalance to event
+    Map<String, Resource> resourceMap = new HashMap<>();
+    Resource resource = mock(Resource.class);
+    when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
+    List<Partition> partitions = Arrays.asList(partition);
+    when(resource.getPartitions()).thenReturn(partitions);
+    when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
+    resourceMap.put(TEST_RESOURCE, resource);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+
+    // set up resource state map
+    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
+    Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
+    Map<String, String> instanceStateMap = new HashMap<>();
+    instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
+    stateMap.put(partition, instanceStateMap);
+    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+
+    processEvent(event, resourcesStateMap);
+    MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1);
+  }
+}