You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 21:44:04 UTC

[4/5] helix git commit: [HELIX-669] State Transition Cancellation Server change

[HELIX-669] State Transition Cancellation Server change

State transition takes a vital part of Helix managing clusters. There are different reasons can cause state transition is not necessary, for example, the node of partition running state transition is down. Thus state transition cancellation would be a useful feature to have in Helix. It not only helps cancel the state transition to avoid invalid state but also benefits for reducing redundant state transitions.

In this rb :
1. Sending state transition cancellation message when pending state is not match the target state.
2. Complete integration test the state transition cancellation.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/804ff7c9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/804ff7c9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/804ff7c9

Branch: refs/heads/master
Commit: 804ff7c93d4fcd87560bc9a6eb7f03d216cf1625
Parents: b9de836
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 12:22:23 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 12:24:48 2017 -0700

----------------------------------------------------------------------
 .../stages/CurrentStateComputationStage.java    |  19 +-
 .../controller/stages/CurrentStateOutput.java   |  48 ++-
 .../stages/MessageGenerationPhase.java          |  53 +++-
 .../stages/MessageSelectionStage.java           |   4 +
 .../controller/stages/TaskAssignmentStage.java  |   2 +-
 .../handling/HelixStateTransitionHandler.java   |   6 +-
 .../messaging/handling/HelixTaskExecutor.java   |  21 +-
 .../org/apache/helix/model/ClusterConfig.java   |  19 ++
 .../participant/statemachine/StateModel.java    |  12 +
 .../stages/TestRebalancePipeline.java           |   4 +
 .../TestStateTransitionCancellation.java        | 293 +++++++++++++++++++
 .../handling/TestHelixTaskExecutor.java         |   4 +
 .../helix/task/TaskSynchronizedTestBase.java    |   5 +-
 13 files changed, 466 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index d548d3c..d5629bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -27,9 +27,9 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
-import org.apache.helix.model.Message.MessageType;
 
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
@@ -54,7 +54,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       String instanceName = instance.getInstanceName();
       Map<String, Message> instanceMessages = cache.getMessages(instanceName);
       for (Message message : instanceMessages.values()) {
-        if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
+        if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
+            && !MessageType.STATE_TRANSITION_CANCELLATION.name()
+            .equalsIgnoreCase(message.getMsgType())) {
           continue;
         }
         if (!instance.getSessionId().equals(message.getTgtSessionId())) {
@@ -70,7 +72,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           String partitionName = message.getPartitionName();
           Partition partition = resource.getPartition(partitionName);
           if (partition != null) {
-            currentStateOutput.setPendingState(resourceName, partition, instanceName, message);
+            setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
           } else {
             // log
           }
@@ -80,7 +82,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
             for (String partitionName : partitionNames) {
               Partition partition = resource.getPartition(partitionName);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceName, partition, instanceName, message);
+                setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
               } else {
                 // log
               }
@@ -128,4 +130,13 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
+
+  private void setMessageState(CurrentStateOutput currentStateOutput, String resourceName,
+      Partition partition, String instanceName, Message message) {
+    if (MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
+      currentStateOutput.setPendingState(resourceName, partition, instanceName, message);
+    } else {
+      currentStateOutput.setCancellationState(resourceName, partition, instanceName, message);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 646c98f..b21fb93 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Sets;
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
   private final Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> _cancellationStateMap;
   // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the
   // REQUESTED_STATE
   // field in the CURRENTSTATES node.
@@ -55,6 +56,7 @@ public class CurrentStateOutput {
   public CurrentStateOutput() {
     _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
     _pendingStateMap = new HashMap<String, Map<Partition, Map<String, Message>>>();
+    _cancellationStateMap = new HashMap<String, Map<Partition, Map<String, Message>>>();
     _resourceStateModelMap = new HashMap<String, String>();
     _curStateMetaMap = new HashMap<String, CurrentState>();
     _requestedStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
@@ -122,13 +124,30 @@ public class CurrentStateOutput {
 
   public void setPendingState(String resourceName, Partition partition, String instanceName,
       Message message) {
-    if (!_pendingStateMap.containsKey(resourceName)) {
-      _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, Message>>());
+    setStateMessage(resourceName, partition, instanceName, message, _pendingStateMap);
+  }
+
+  /**
+   * Update the cancellation messages per resource per partition
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @param message
+   */
+  public void setCancellationState(String resourceName, Partition partition, String instanceName,
+      Message message) {
+    setStateMessage(resourceName, partition, instanceName, message, _cancellationStateMap);
+  }
+
+  private void setStateMessage(String resourceName, Partition partition, String instanceName,
+      Message message, Map<String, Map<Partition, Map<String, Message>>> stateMessageMap) {
+    if (!stateMessageMap.containsKey(resourceName)) {
+      stateMessageMap.put(resourceName, new HashMap<Partition, Map<String, Message>>());
     }
-    if (!_pendingStateMap.get(resourceName).containsKey(partition)) {
-      _pendingStateMap.get(resourceName).put(partition, new HashMap<String, Message>());
+    if (!stateMessageMap.get(resourceName).containsKey(partition)) {
+      stateMessageMap.get(resourceName).put(partition, new HashMap<String, Message>());
     }
-    _pendingStateMap.get(resourceName).get(partition).put(instanceName, message);
+    stateMessageMap.get(resourceName).get(partition).put(instanceName, message);
   }
 
   /**
@@ -179,7 +198,24 @@ public class CurrentStateOutput {
    * @return pending message
    */
   public Message getPendingState(String resourceName, Partition partition, String instanceName) {
-    Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName);
+    return getStateMessage(resourceName, partition, instanceName, _pendingStateMap);
+  }
+
+  /**
+   * Fetch cancellation message per resource, partition, instance
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public Message getCancellationState(String resourceName, Partition partition,
+      String instanceName) {
+    return getStateMessage(resourceName, partition, instanceName, _cancellationStateMap);
+  }
+
+  private Message getStateMessage(String resourceName, Partition partition, String instanceName,
+      Map<String, Map<Partition, Map<String, Message>>> stateMessageMap) {
+    Map<Partition, Map<String, Message>> map = stateMessageMap.get(resourceName);
     if (map != null) {
       Map<String, Message> instanceStateMap = map.get(partition);
       if (instanceStateMap != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index f5f912e..4689962 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -109,6 +109,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
             continue;
           }
 
+          Message message = null;
+
           if (pendingMessage != null) {
             String pendingState = pendingMessage.getToState();
             if (nextState.equalsIgnoreCase(pendingState)) {
@@ -121,20 +123,32 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                       + pendingState + ", desiredState: " + desiredState);
             } else {
               logger.info("IdealState changed before state transition completes for " +
-                  resource.getResourceName() + "." + partition.getPartitionName() + " on "
+                      resource.getResourceName() + "." + partition.getPartitionName() + " on "
                       + instanceName + ", pendingState: " + pendingState + ", currentState: "
                       + currentState + ", nextState: " + nextState);
+              Message cancellationMessage =
+                  currentStateOutput.getCancellationState(resourceName, partition, instanceName);
+              if (cache.getClusterConfig().isStateTransitionCancelEnabled()
+                  && cancellationMessage == null) {
+                logger.info("Send cancellation message of the state transition for " + resource
+                    .getResourceName() + "." + partition.getPartitionName() + " on " + instanceName
+                    + ", currentState: " + currentState + ", nextState: " + nextState);
+                message = createStateTransitionCancellationMessage(manager, resource,
+                    partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
+                    stateModelDef.getId(), pendingMessage.getFromState(),
+                    pendingMessage.getToState());
+              }
             }
           } else {
+            // Create new state transition message
+            message = createStateTransitionMessage(manager, resource, partition.getPartitionName(), instanceName,
+                currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId());
+          }
 
-            Message message =
-                createMessage(manager, resource, partition.getPartitionName(), instanceName,
-                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId());
-
+          if (message != null) {
             IdealState idealState = cache.getIdealState(resourceName);
-            if (idealState != null
-                && idealState.getStateModelDefRef().equalsIgnoreCase(
-                    DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            if (idealState != null && idealState.getStateModelDefRef()
+                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
               if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
                 message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
                     idealState.getRecord().getMapField(partition.getPartitionName()));
@@ -171,7 +185,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
   }
 
-  private Message createMessage(HelixManager manager, Resource resource, String partitionName,
+  private Message createStateTransitionMessage(HelixManager manager, Resource resource, String partitionName,
       String instanceName, String currentState, String nextState, String sessionId,
       String stateModelDefName) {
     String uuid = UUID.randomUUID().toString();
@@ -199,6 +213,27 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     return message;
   }
 
+  private Message createStateTransitionCancellationMessage(HelixManager manager, Resource resource,
+      String partitionName, String instanceName, String sessionId, String stateModelDefName,
+      String fromState, String nextState) {
+    String uuid = UUID.randomUUID().toString();
+    Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, uuid);
+    message.setSrcName(manager.getInstanceName());
+    message.setTgtName(instanceName);
+    message.setMsgState(MessageState.NEW);
+    message.setPartitionName(partitionName);
+    message.setResourceName(resource.getResourceName());
+    message.setFromState(fromState);
+    message.setToState(nextState);
+    message.setTgtSessionId(sessionId);
+    message.setSrcSessionId(manager.getSessionId());
+    message.setStateModelDef(stateModelDefName);
+    message.setStateModelFactoryName(resource.getStateModelFactoryname());
+    message.setBucketSize(resource.getBucketSize());
+
+    return message;
+  }
+
   private int getTimeOut(ResourceConfig resourceConfig, String currentState, String nextState,
       IdealState idealState, Partition partition) {
     // Set timeout of needed

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 8e50d83..11544f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -172,6 +172,10 @@ public class MessageSelectionStage extends AbstractBaseStage {
     Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
         new TreeMap<Integer, List<Message>>();
     for (Message message : messages) {
+      if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+        selectedMessages.add(message);
+        continue;
+      }
       String fromState = message.getFromState();
       String toState = message.getToState();
       String transition = fromState + "-" + toState;

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 8aed23e..076fe90 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -144,7 +144,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
           "Sending Message " + message.getMsgId() + " to " + message.getTgtName() + " transit "
               + message.getResourceName() + "." + message.getPartitionName() + "|" + message
               .getPartitionNames() + " from:" + message.getFromState() + " to:" + message
-              .getToState());
+              .getToState() + ", Message type ");
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 30d321f..fa9def6 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -303,6 +303,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
         if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
           e = (InterruptedException) e.getCause();
         }
+
+        if (e.getCause() != null && e.getCause() instanceof HelixRollbackException) {
+          throw new HelixRollbackException(e.getCause());
+        }
         _statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e, errorMessage,
             accessor);
         taskResult.setSuccess(false);
@@ -321,7 +325,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
   private void invoke(HelixDataAccessor accessor, NotificationContext context,
       HelixTaskResult taskResult, Message message) throws IllegalAccessException,
-      InvocationTargetException, InterruptedException {
+      InvocationTargetException, InterruptedException, HelixRollbackException {
     _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
         "Message handling invoking", accessor);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 936dc56..f27ddb0 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -729,6 +729,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             getMessageTarget(message.getResourceName(), message.getPartitionName());
         // State transition message and cancel message are in same batch
         if (stateTransitionHandlers.containsKey(messageTarget)) {
+          if (!isCancelingSameStateTransition(
+              stateTransitionHandlers.get(messageTarget).getMessage(), message)) {
+            removeMessageFromZk(accessor, message, instanceName);
+            continue;
+          }
+
           markReadMessage(message, changeContext, accessor);
           readMsgs.add(message);
           _monitor.reportProcessedMessage(message,
@@ -747,12 +753,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             String taskId = _messageTaskMap.get(messageTarget);
             HelixTask task = (HelixTask) _taskMap.get(taskId).getTask();
             Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
+
+            if (!isCancelingSameStateTransition(task.getMessage(), message)) {
+              removeMessageFromZk(accessor, message, instanceName);
+              continue;
+            }
+
             if (task.cancel()) {
+              Message stateTransitionMessage = task.getMessage();
               future.cancel(false);
               _monitor.reportProcessedMessage(message,
                   ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
               removeMessageFromZk(accessor, message, instanceName);
-              removeMessageFromZk(accessor, stateTransitionHandlers.get(messageTarget).getMessage(),
+              removeMessageFromZk(accessor, stateTransitionMessage,
                   instanceName);
               continue;
             }
@@ -883,6 +896,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     }
   }
 
+  private boolean isCancelingSameStateTransition(Message stateTranstionMessage,
+      Message cancellationMessage) {
+    return stateTranstionMessage.getFromState().equalsIgnoreCase(cancellationMessage.getFromState())
+        && stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
+  }
+
   private String getMessageTarget(String resourceName, String partitionName) {
     return String.format("%s_%s", resourceName, partitionName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 79bb6fa..4b217cd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -43,6 +43,7 @@ public class ClusterConfig extends HelixProperty {
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
     DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold until rebalancing.
     STATE_TRANSITION_THROTTLE_CONFIGS,
+    STATE_TRANSITION_CANCELLATION_ENABLED,
     BATCH_STATE_TRANSITION_MAX_THREADS,
     MAX_CONCURRENT_TASK_PER_INSTANCE
   }
@@ -155,6 +156,24 @@ public class ClusterConfig extends HelixProperty {
     return _record.getIntField(ClusterConfigProperty.MAX_CONCURRENT_TASK_PER_INSTANCE.name(),
         DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE);
   }
+  /**
+   * Enable/Disable state transition cancellation for the cluster
+   * @param enable
+   */
+  public void stateTransitionCancelEnabled(Boolean enable) {
+    if (enable == null) {
+      _record.getSimpleFields()
+          .remove(ClusterConfigProperty.STATE_TRANSITION_CANCELLATION_ENABLED.name());
+    } else {
+      _record.setBooleanField(ClusterConfigProperty.STATE_TRANSITION_CANCELLATION_ENABLED.name(),
+          enable);
+    }
+  }
+
+  public boolean isStateTransitionCancelEnabled() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.STATE_TRANSITION_CANCELLATION_ENABLED.name(), false);
+  }
 
   @Override
   public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 6f29723..8e97252 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -83,6 +83,18 @@ public abstract class StateModel {
 
   /**
    * Default implementation for cancelling state transition
+   *
+   * IMPORTANT:
+   *
+   * 1. Be careful with the implementation of this method. There is no
+   * grantee that this method is called before user state transition method invoked.
+   * Please make sure the implemention contains logic for checking state transition already started.
+   * Similar to this situation, when this cancel method has been called. Helix does not grantee the
+   * state transition is still running. The state transition could be completed.
+   *
+   * 2. This cancel method should not throw HelixRollbackException. It is better to trigger the real
+   * state transition to throw HelixRollbackException if user would like to cancel the current
+   * running state transition.
    */
   public void cancel() {
     _cancelled = true;

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index a6863ca..a50b08e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -34,6 +34,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -209,6 +210,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     event.addAttribute("helixmanager", manager);
 
     ClusterDataCache cache = new ClusterDataCache();
+    cache._clusterConfig = new ClusterConfig(clusterName);
     event.addAttribute("ClusterDataCache", cache);
 
     final String resourceName = "testResource_pending";
@@ -266,6 +268,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     cache.setIdealStates(idealStates);
 
     runPipeline(event, dataRefresh);
+    cache = event.getAttribute("ClusterDataCache");
+    cache._clusterConfig = new ClusterConfig(clusterName);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
new file mode 100644
index 0000000..f001438
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
@@ -0,0 +1,293 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.mock.participant.MockDelayMSStateModel;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestStateTransitionCancellation extends TaskTestBase {
+  // TODO: Replace the thread sleep with synchronized condition check
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numParitions = 20;
+    _numNodes = 2;
+    _numReplicas = 2;
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    setupParticipants();
+    setupDBs();
+
+    registerParticipants(_participants, _numNodes, _startPort, 0, -3000000L);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    createManagers();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @Test
+  public void testCancellationWhenDisableResource() throws InterruptedException {
+    // Enable cancellation
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Wait for assignment done
+    Thread.sleep(2000);
+
+    // Disable the resource
+    _setupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
+
+    // Wait for pipeline reaching final stage
+    Thread.sleep(2000L);
+    ExternalView externalView = _setupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+    for (String partition : externalView.getPartitionSet()) {
+      for (String currentState : externalView.getStateMap(partition).values()) {
+        Assert.assertEquals(currentState, "OFFLINE");
+      }
+    }
+  }
+
+  @Test
+  public void testDisableCancellationWhenDisableResource() throws InterruptedException {
+    // Disable cancellation
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(false);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Reenable resource
+    stateCleanUp();
+    _setupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, true);
+
+    // Wait for assignment done
+    Thread.sleep(2000);
+
+    // Disable the resource
+    _setupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
+
+    // Wait for pipeline reaching final stage
+    Thread.sleep(2000L);
+    ExternalView externalView = _setupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+    for (String partition : externalView.getPartitionSet()) {
+      Assert.assertTrue(externalView.getStateMap(partition).values().contains("SLAVE"));
+    }
+  }
+
+  @Test
+  public void testRebalancingCauseCancellation() throws InterruptedException {
+    // Enable cancellation
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Reenable resource
+    stateCleanUp();
+    _setupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, true);
+
+    // Wait for assignment done
+    Thread.sleep(2000);
+    int numNodesToStart = 10;
+    for (int i = 0; i < numNodesToStart; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + _numNodes + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    MockParticipantManager[] newParticipants = new MockParticipantManager[numNodesToStart];
+    registerParticipants(newParticipants, numNodesToStart, _startPort + _numNodes, 1000, -3000000L);
+
+    // Wait for pipeline reaching final stage
+    Thread.sleep(2000L);
+    int numOfMasters = 0;
+    ExternalView externalView = _setupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+    for (String partition : externalView.getPartitionSet()) {
+      if (externalView.getStateMap(partition).values().contains("MASTER")) {
+        numOfMasters++;
+      }
+    }
+
+    for (MockParticipantManager participant : newParticipants) {
+      participant.syncStop();
+    }
+
+    // Only partial of state transition has been cancelled
+    Assert.assertTrue((numOfMasters > 0 && numOfMasters < _numParitions));
+  }
+
+  private void stateCleanUp() {
+    InternalMockDelayMSStateModel._cancelledFirstTime = true;
+    InternalMockDelayMSStateModel._cancelledStatic = false;
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR"
+  })
+  public static class InternalMockDelayMSStateModel extends StateModel {
+    private static Logger LOG = Logger.getLogger(MockDelayMSStateModel.class);
+    private long _delay;
+    public static boolean _cancelledStatic;
+    public static boolean _cancelledFirstTime;
+
+    public InternalMockDelayMSStateModel(long delay) {
+      _delay = delay;
+      _cancelledStatic = false;
+      _cancelledFirstTime = true;
+    }
+
+    @Transition(to = "SLAVE", from = "OFFLINE") public void onBecomeSlaveFromOffline(
+        Message message, NotificationContext context) {
+      if (_delay > 0) {
+        try {
+          Thread.sleep(_delay);
+        } catch (InterruptedException e) {
+          LOG.error("Failed to sleep for " + _delay);
+        }
+      }
+      LOG.info("Become SLAVE from OFFLINE");
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message,
+        NotificationContext context) throws InterruptedException, HelixRollbackException {
+      if (_cancelledFirstTime && _delay < 0) {
+        while (!_cancelledStatic) {
+          Thread.sleep(Math.abs(1000L));
+        }
+        _cancelledFirstTime = false;
+        throw new HelixRollbackException("EX");
+      }
+      LOG.error("Become MASTER from SLAVE");
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER") public void onBecomeSlaveFromMaster(Message message,
+        NotificationContext context) {
+      LOG.info("Become Slave from Master");
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE") public void onBecomeOfflineFromSlave(
+        Message message, NotificationContext context) {
+      LOG.info("Become OFFLINE from SLAVE");
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE") public void onBecomeDroppedFromOffline(
+        Message message, NotificationContext context) {
+      LOG.info("Become DROPPED FROM OFFLINE");
+    }
+
+    @Override
+    public void cancel() {
+      _cancelledStatic = true;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return _cancelledStatic;
+    }
+  }
+
+  public class InMockDelayMSStateModelFactory
+      extends StateModelFactory<InternalMockDelayMSStateModel> {
+    private long _delay;
+
+    @Override
+    public InternalMockDelayMSStateModel createNewStateModel(String resourceName,
+        String partitionKey) {
+      InternalMockDelayMSStateModel model = new InternalMockDelayMSStateModel(_delay);
+      return model;
+    }
+
+    public InMockDelayMSStateModelFactory setDelay(long delay) {
+      _delay = delay;
+      return this;
+    }
+  }
+
+  private void registerParticipants(MockParticipantManager[] participants, int numNodes,
+      int startPort, long sleepTime, long delay) throws InterruptedException {
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    for (int i = 0; i < numNodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (startPort + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // add a state model with non-OFFLINE initial state
+      StateMachineEngine stateMach = participants[i].getStateMachineEngine();
+      stateMach.registerStateModelFactory("Task",
+          new TaskStateModelFactory(participants[i], taskFactoryReg));
+      InMockDelayMSStateModelFactory delayFactory =
+          new InMockDelayMSStateModelFactory().setDelay(delay);
+      stateMach.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
+
+      participants[i].syncStart();
+
+      if (sleepTime > 0) {
+        Thread.sleep(sleepTime);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 5304a45..976ecfe 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -647,6 +647,8 @@ public class TestHelixTaskExecutor {
     msg1.setResourceName("R1");
     msg1.setTgtName("Localhost_1123");
     msg1.setSrcName("127.101.1.23_2234");
+    msg1.setFromState("SLAVE");
+    msg1.setToState("MASTER");
     msgList.add(msg1);
 
     Message msg2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString());
@@ -655,6 +657,8 @@ public class TestHelixTaskExecutor {
     msg2.setResourceName("R1");
     msg2.setTgtName("Localhost_1123");
     msg2.setSrcName("127.101.1.23_2234");
+    msg2.setFromState("SLAVE");
+    msg2.setToState("MASTER");
     msgList.add(msg2);
 
     executor.onMessage("someInstance", msgList, changeContext);

http://git-wip-us.apache.org/repos/asf/helix/blob/804ff7c9/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index 1004c1f..4a8c01a 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -102,8 +102,9 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
         idealState.setInstanceGroupTag("TESTTAG0");
         _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
       } else {
-        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
-            _numParitions, MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+        _setupTool
+            .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
+                MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
       }
       _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
     }