You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/03/09 19:04:21 UTC
[helix] branch master updated: Generate cancellation message for
currentState=null desiredState=DROPPED (#831)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fcf78cd Generate cancellation message for currentState=null desiredState=DROPPED (#831)
fcf78cd is described below
commit fcf78cdb1b114a5424ecb179536d9c2931e9bba1
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 9 12:04:13 2020 -0700
Generate cancellation message for currentState=null desiredState=DROPPED (#831)
* Generate cancellation message for currentState=null desiredState=DROPPED
Generate cancellation message for currentState=null desiredState=DROPPED
---
.../controller/stages/MessageGenerationPhase.java | 145 +++++++++++++--------
.../stages/TestCancellationMessageGeneration.java | 97 ++++++++++++++
2 files changed, 186 insertions(+), 56 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 99bcdb5..4223c37 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -151,14 +151,32 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
String currentState =
currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+ Message pendingMessage =
+ currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
+ boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
+ Message cancellationMessage =
+ currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
+ String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+
+ Message message = null;
+
if (currentState == null) {
currentState = stateModelDef.getInitialState();
+ nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+
if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
LogUtil.logDebug(logger, _eventId,
String.format(
"No current state for partition %s in resource %s, skip the drop message",
partition.getPartitionName(), resourceName));
+ message =
+ generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
+ manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
+ cancellationMessage, isCancellationEnabled);
+ addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState,
+ resourceName, partition, currentState, nextState);
+
// TODO: separate logic of resource/task message generation
if (cache instanceof ResourceControllerDataProvider) {
((ResourceControllerDataProvider) cache)
@@ -168,15 +186,6 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
}
}
- Message pendingMessage =
- currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
- boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
- Message cancellationMessage =
- currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
- String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-
- Message message = null;
-
if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState,
currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
LogUtil.logInfo(logger, _eventId, String.format(
@@ -207,29 +216,10 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
}
if (pendingMessage != null) {
- String pendingState = pendingMessage.getToState();
- if (nextState.equalsIgnoreCase(pendingState)) {
- LogUtil.logInfo(logger, _eventId,
- "Message already exists for " + instanceName + " to transit " + resource
- .getResourceName() + "." + partition.getPartitionName() + " from "
- + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
- } else if (currentState.equalsIgnoreCase(pendingState)) {
- LogUtil.logInfo(logger, _eventId,
- "Message hasn't been removed for " + instanceName + " to transit " + resource
- .getResourceName() + "." + partition.getPartitionName() + " to "
- + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage());
- } else {
- LogUtil.logInfo(logger, _eventId,
- "IdealState changed before state transition completes for " + resource
- .getResourceName() + "." + partition.getPartitionName() + " on "
- + instanceName + ", pendingState: " + pendingState + ", currentState: "
- + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
-
- message = createStateTransitionCancellationMessage(manager, resource,
- partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
- stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
- cancellationMessage, isCancellationEnabled, currentState);
- }
+ message =
+ generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
+ manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
+ cancellationMessage, isCancellationEnabled);
} else {
// Create new state transition message
message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
@@ -244,30 +234,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
}
}
}
-
- if (message != null) {
- IdealState idealState = cache.getIdealState(resourceName);
- if (idealState != null && idealState.getStateModelDefRef()
- .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
- message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- idealState.getRecord().getMapField(partition.getPartitionName()));
- }
- }
-
- int timeout = getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName),
- currentState, nextState, idealState, partition);
- if (timeout > 0) {
- message.setExecutionTimeout(timeout);
- }
-
- message.setAttribute(Message.Attributes.ClusterEventName, eventType.name());
- // output.addMessage(resourceName, partition, message);
- if (!messageMap.containsKey(desiredState)) {
- messageMap.put(desiredState, new ArrayList<Message>());
- }
- messageMap.get(desiredState).add(message);
- }
+ addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState, resourceName,
+ partition, currentState, nextState);
}
// add generated messages to output according to state priority
@@ -291,6 +259,71 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
} // end of for-each-partition
}
+ private Message generateCancellationMessageForPendingMessage(final String desiredState, final String currentState,
+ final String nextState, final Message pendingMessage, final HelixManager manager,
+ final Resource resource, final Partition partition, final Map<String, String> sessionIdMap,
+ final String instanceName, final StateModelDefinition stateModelDef,
+ final Message cancellationMessage, final boolean isCancellationEnabled) {
+
+ Message message = null;
+
+ if (pendingMessage != null) {
+ String pendingState = pendingMessage.getToState();
+ if (nextState.equalsIgnoreCase(pendingState)) {
+ LogUtil.logInfo(logger, _eventId,
+ "Message already exists for " + instanceName + " to transit " + resource
+ .getResourceName() + "." + partition.getPartitionName() + " from "
+ + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+ } else if (currentState.equalsIgnoreCase(pendingState)) {
+ LogUtil.logInfo(logger, _eventId,
+ "Message hasn't been removed for " + instanceName + " to transit " + resource
+ .getResourceName() + "." + partition.getPartitionName() + " to "
+ + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage());
+ } else {
+ LogUtil.logInfo(logger, _eventId,
+ "IdealState changed before state transition completes for " + resource
+ .getResourceName() + "." + partition.getPartitionName() + " on "
+ + instanceName + ", pendingState: " + pendingState + ", currentState: "
+ + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+
+ message = createStateTransitionCancellationMessage(manager, resource,
+ partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
+ stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
+ cancellationMessage, isCancellationEnabled, currentState);
+ }
+ }
+ return message;
+ }
+
+ private void addGeneratedMessageToMap(final Message message,
+ Map<String, List<Message>> messageMap, final ClusterEventType eventType,
+ final BaseControllerDataProvider cache, final String desiredState, final String resourceName,
+ final Partition partition, final String currentState, final String nextState) {
+ if (message != null) {
+ IdealState idealState = cache.getIdealState(resourceName);
+ if (idealState != null && idealState.getStateModelDefRef()
+ .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+ message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+ idealState.getRecord().getMapField(partition.getPartitionName()));
+ }
+ }
+
+ int timeout = getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName),
+ currentState, nextState, idealState, partition);
+ if (timeout > 0) {
+ message.setExecutionTimeout(timeout);
+ }
+
+ message.setAttribute(Message.Attributes.ClusterEventName, eventType.name());
+ // output.addMessage(resourceName, partition, message);
+ if (!messageMap.containsKey(desiredState)) {
+ messageMap.put(desiredState, new ArrayList<Message>());
+ }
+ messageMap.get(desiredState).add(message);
+ }
+ }
+
/**
* Start a job in worker pool that asynchronously clean up pending message. Since it is possible
* that participant failed to clean up message after processing, it is important for controller to
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
new file mode 100644
index 0000000..6d7c4ae
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -0,0 +1,97 @@
+package org.apache.helix.controller.stages;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * This test checks the cancellation message generation when currentState=null and desiredState=DROPPED
+ */
+public class TestCancellationMessageGeneration extends MessageGenerationPhase {
+ private static final String TEST_CLUSTER = "testCluster";
+ private static final String TEST_RESOURCE = "resource0";
+ private static final String TEST_INSTANCE = "instance0";
+ private static final String TEST_PARTITION = "partition0";
+
+ @Test
+ public void TestOFFLINEToDROPPED() throws Exception {
+
+ ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown);
+
+
+ // Set current state to event
+ CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
+ Partition partition = mock(Partition.class);
+ when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
+ when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(null);
+ Message message = mock(Message.class);
+ when(message.getFromState()).thenReturn("OFFLINE");
+ when(message.getToState()).thenReturn("SLAVE");
+ when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(message);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ // Set helix manager to event
+ event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
+
+ // Set controller data provider to event
+ BaseControllerDataProvider cache = mock(BaseControllerDataProvider.class);
+ StateModelDefinition stateModelDefinition = new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
+ Map<String, LiveInstance> liveInstances= mock(Map.class);
+ LiveInstance mockLiveInstance = mock(LiveInstance.class);
+ when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
+ when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
+ when(liveInstances.values()).thenReturn(Arrays.asList(mockLiveInstance));
+ when(cache.getLiveInstances()).thenReturn(liveInstances);
+ ClusterConfig clusterConfig = mock(ClusterConfig.class);
+ when(cache.getClusterConfig()).thenReturn(clusterConfig);
+ when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(true);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+
+ // Set resources to rebalance to event
+ Map<String, Resource> resourceMap = new HashMap<>();
+ Resource resource = mock(Resource.class);
+ when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
+ List<Partition> partitions = Arrays.asList(partition);
+ when(resource.getPartitions()).thenReturn(partitions);
+ when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
+ resourceMap.put(TEST_RESOURCE, resource);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+
+ // set up resource state map
+ ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+ PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
+ Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
+ Map<String, String> instanceStateMap = new HashMap<>();
+ instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
+ stateMap.put(partition, instanceStateMap);
+ resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+
+ processEvent(event, resourcesStateMap);
+ MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1);
+ }
+}