You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/12/19 17:54:19 UTC

[9/9] helix git commit: [HELIX-650] Add StateTransitionConfig and Expose API add state transition timeout

[HELIX-650] Add StateTransitionConfig and Expose API add state transition timeout

1. Add StateTransitionConfig for add state transition properties, such as timeout.
2. Add the new API for setting state transition timeout.
3. Add logics in message generation for timeout setting in message that backward compatible.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8c67740
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8c67740
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8c67740

Branch: refs/heads/helix-0.6.x
Commit: c8c677405e6d7d9e9d67594fbe1b0efda455ac2f
Parents: 4c3fc7f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Dec 16 15:39:57 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Dec 16 15:50:18 2016 -0800

----------------------------------------------------------------------
 .../config/StateTransitionTimeoutConfig.java    | 130 +++++++++++++
 .../stages/MessageGenerationPhase.java          |  77 +++++---
 .../java/org/apache/helix/model/Message.java    |   2 +-
 .../org/apache/helix/model/ResourceConfig.java  |  18 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |   5 +-
 .../java/org/apache/helix/task/TaskDriver.java  |   6 +-
 .../java/org/apache/helix/task/TaskUtil.java    |   2 +-
 .../TestStateTransitionTimeoutWithResource.java | 194 +++++++++++++++++++
 .../model/TestStateTransitionProperty.java      |  40 ++++
 9 files changed, 438 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
new file mode 100644
index 0000000..d39f466
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
@@ -0,0 +1,130 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+public class StateTransitionTimeoutConfig {
+  public enum StateTransitionTimeoutProperty {
+    /**
+     * The timeout for a state transition
+     */
+    TIMEOUT
+  }
+
+  private final String _resource;
+  private final Map<String, String> _timeoutMap;
+
+  public StateTransitionTimeoutConfig(ZNRecord record) {
+    _resource = record.getId();
+    if (record.getMapFields().containsKey(StateTransitionTimeoutProperty.TIMEOUT.name())) {
+      _timeoutMap = record.getMapField(StateTransitionTimeoutProperty.TIMEOUT.name());
+    } else {
+      _timeoutMap = new HashMap<String, String>();
+    }
+  }
+
+  /**
+   * Set state transition timeout for given resource.
+   * Does not apply for Workflow and Job
+   * @param from          The from state
+   * @param to            The to state
+   * @param timeout       The timeout in miliseconds
+   */
+  public void setStateTransitionTimeout(String from, String to, int timeout) {
+    setStateTransitionTimeout(null, from, to, timeout);
+  }
+
+  /**
+   * Set state transition timeout for general condition.
+   * Does not apply for Workflow and Job
+   * @param partitionName The partition prefer to time out
+   * @param from          The from state
+   * @param to            The to state
+   * @param timeout       The timeout in miliseconds
+   */
+  private void setStateTransitionTimeout(String partitionName, String from, String to,
+      int timeout) {
+    if (partitionName != null) {
+      _timeoutMap.put(partitionName, String.valueOf(timeout));
+    } else {
+      _timeoutMap.put(String.format("%s.%s", from, to), String.valueOf(timeout));
+    }
+  }
+
+  /**
+   * Get state transition time out for given partition.
+   * Does not apply for Workflow and Job
+   * @param partitionName The partition prefer to time out
+   * @param from          The from state
+   * @param to            The to state
+   * @return              The timeout in miliseconds. Or -1 if there is no timeout matched up.
+   */
+  public int getStateTransitionTimeout(String partitionName, String from, String to) {
+    if (partitionName != null && _timeoutMap.containsKey(partitionName)) {
+      return Integer.parseInt(_timeoutMap.get(partitionName));
+    } else if (_timeoutMap.containsKey(String.format("%s.%s", from, to))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("%s.%s", from, to)));
+    } else if (_timeoutMap.containsKey(String.format("*.%s", to))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("*.%s", to)));
+    } else if (_timeoutMap.containsKey(String.format("%s.*", from))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("%s.*", from)));
+    } else if (_timeoutMap.containsKey("*.*")) {
+      return Integer.parseInt(_timeoutMap.get("*.*"));
+    }
+    return -1;
+  }
+
+  /**
+   * Get state transition time out for given partition.
+   * Does not apply for Workflow and Job
+   * @param from          The from state
+   * @param to            The to state
+   * @return              The timeout in miliseconds. Or -1 if there is no timeout matched up.
+   */
+  public int getStateTransitionTimeout(String from, String to) {
+    return getStateTransitionTimeout(null, from, to);
+  }
+
+  public Map<String, String> getTimeoutMap() {
+    return _timeoutMap;
+  }
+
+  public String getResource() {
+    return _resource;
+  }
+
+  /**
+   * Get StateTransitionTimeoutConfig from ZNRecord instead of creating a new
+   * StateTransitionTimeoutConfig object.
+   * @param record The ZNRecord to extract StateTransitionTimeoutConfig
+   * @return       A StateTransitionTimeoutConfig if ZNRecord contains StateTransitionTimeoutConfig
+   *               setting.
+   */
+  public static StateTransitionTimeoutConfig fromRecord(ZNRecord record) {
+    return record.getMapFields()
+        .containsKey(StateTransitionTimeoutConfig.StateTransitionTimeoutProperty.TIMEOUT.name())
+        ? new StateTransitionTimeoutConfig(record)
+        : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 2e919f8..63096bb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,17 +26,19 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
 /**
@@ -136,32 +138,13 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                     idealState.getRecord().getMapField(partition.getPartitionName()));
               }
             }
-            // Set timeout of needed
-            String stateTransition =
-                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-            if (idealState != null) {
-              String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-              if (timeOutStr == null
-                  && idealState.getStateModelDefRef().equalsIgnoreCase(
-                      DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-                // scheduled task queue
-                if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                  timeOutStr =
-                      idealState.getRecord().getMapField(partition.getPartitionName())
-                          .get(Message.Attributes.TIMEOUT.toString());
-                }
-              }
-              if (timeOutStr != null) {
-                try {
-                  int timeout = Integer.parseInt(timeOutStr);
-                  if (timeout > 0) {
-                    message.setExecutionTimeout(timeout);
-                  }
-                } catch (Exception e) {
-                  logger.error("", e);
-                }
-              }
+
+            int timeout = getTimeOut(cache.getResourceConfig(resourceName), currentState, nextState,
+                idealState, partition);
+            if (timeout > 0) {
+              message.setExecutionTimeout(timeout);
             }
+
             message.getRecord().setSimpleField("ClusterEventName", event.getName());
             // output.addMessage(resourceName, partition, message);
             if (!messageMap.containsKey(desiredState)) {
@@ -213,4 +196,44 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
     return message;
   }
+
+  private int getTimeOut(ResourceConfig resourceConfig, String currentState, String nextState,
+      IdealState idealState, Partition partition) {
+    // Set timeout of needed
+    int timeout = -1;
+    if (resourceConfig != null) {
+      // Set timeout once ResourceConfig set
+      StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
+          resourceConfig.getStateTransitionTimeoutConfig();
+      timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig
+          .getStateTransitionTimeout(currentState, nextState) : -1;
+
+    }
+
+    if (timeout <= 0) {
+      String timeOutStr = null;
+      // Check IdealState whether has timeout set
+      if (idealState != null) {
+        String stateTransition = currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+        timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+        if (timeOutStr == null && idealState.getStateModelDefRef()
+            .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          // scheduled task queue
+          if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+            timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName())
+                .get(Message.Attributes.TIMEOUT.toString());
+          }
+        }
+      }
+      if (timeOutStr != null) {
+        try {
+          timeout = Integer.parseInt(timeOutStr);
+        } catch (Exception e) {
+          logger.error("", e);
+        }
+      }
+    }
+
+    return timeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 9fed87b..31f50ab 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -602,7 +602,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the value of an attribute
-   * @param attr {@link Attribute}
+   * @param attr {@link Attributes}
    * @return attribute value
    */
   public String getAttribute(Attributes attr) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index e0e9f89..d9a925f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -19,13 +19,14 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.log4j.Logger;
 
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Resource configurations
  */
@@ -84,6 +85,17 @@ public class ResourceConfig extends HelixProperty {
         .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
   }
 
+  // TODO: Move it to constructor and Builder when the logic merged in
+  public void setStateTransitionTimeoutConfig(
+      StateTransitionTimeoutConfig stateTransitionTimeoutConfig) {
+    putMapConfig(StateTransitionTimeoutConfig.StateTransitionTimeoutProperty.TIMEOUT.name(),
+        stateTransitionTimeoutConfig.getTimeoutMap());
+  }
+
+  public StateTransitionTimeoutConfig getStateTransitionTimeoutConfig() {
+    return StateTransitionTimeoutConfig.fromRecord(_record);
+  }
+
   /**
    * Put a set of simple configs.
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 3ce9a65..3f1aca4 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -429,7 +429,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
     Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
     for (String workflow : workflowConfigMap.keySet()) {
-      if (workflowConfigMap.get(workflow).isRecurring()) {
+      if (workflowConfigMap.get(workflow).isRecurring() || workflow.isEmpty()) {
         continue;
       }
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
@@ -473,6 +473,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       jobMonitor.resetJobGauge();
     }
     for (String workflow : driver.getWorkflows().keySet()) {
+      if (workflow.isEmpty()) {
+        continue;
+      }
       Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
       for (String job : allJobs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 97d1067..1020cc3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -202,7 +202,7 @@ public class TaskDriver {
     flow.validate();
 
     // first, add workflow config.
-    if (!TaskUtil.setResouceConfig(_accessor, flow.getName(),
+    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(),
         new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) {
       LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
     }
@@ -257,7 +257,7 @@ public class TaskDriver {
           "Workflow " + workflow + " is terminable, not allow to change its configuration!");
     }
 
-    if (!TaskUtil.setResouceConfig(_accessor, workflow, newWorkflowConfig)) {
+    if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
       LOG.error("Failed to update workflow configuration for workflow " + workflow);
     }
 
@@ -676,7 +676,7 @@ public class TaskDriver {
 
     // Set the job configuration
     JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
-    if (!TaskUtil.setResouceConfig(_accessor, jobName, newJobCfg)) {
+    if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
       LOG.error("Failed to add job configuration for job " + jobName);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f29f2d0..d765cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -120,7 +120,7 @@ public class TaskUtil {
    * @param resourceConfig  The resource config to be set
    * @return                True if set successfully, otherwise false
    */
-  protected static boolean setResouceConfig(HelixDataAccessor accessor, String resource,
+  protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
       ResourceConfig resourceConfig) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
new file mode 100644
index 0000000..f8c0135
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
@@ -0,0 +1,194 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.mock.participant.MockMSStateModel;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.mock.participant.SleepTransition;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBase {
+  private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
+  private HelixManager _manager;
+
+  @Override
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+
+    // Set the timeout values
+    StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
+        new StateTransitionTimeoutConfig(new ZNRecord(TEST_DB));
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("SLAVE", "MASTER", 300);
+    ResourceConfig resourceConfig = new ResourceConfig(TEST_DB);
+    resourceConfig.setStateTransitionTimeoutConfig(stateTransitionTimeoutConfig);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+    _manager.getHelixDataAccessor().setProperty(keyBuilder.resourceConfig(TEST_DB), resourceConfig);
+
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "ERROR"
+  })
+  public static class TimeOutStateModel extends MockMSStateModel {
+    boolean _sleep = false;
+    StateTransitionError _error;
+    int _errorCallcount = 0;
+
+    public TimeOutStateModel(MockTransition transition, boolean sleep) {
+      super(transition);
+      _sleep = sleep;
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+        throws InterruptedException {
+      LOG.info("Become MASTER from SLAVE");
+      if (_transition != null && _sleep) {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Override
+    public void rollbackOnError(Message message, NotificationContext context,
+        StateTransitionError error) {
+      _error = error;
+      _errorCallcount++;
+    }
+  }
+
+  public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel> {
+    Set<String> partitionsToSleep = new HashSet<String>();
+    int _sleepTime;
+
+    public SleepStateModelFactory(int sleepTime) {
+      _sleepTime = sleepTime;
+    }
+
+    public void setPartitions(Collection<String> partitions) {
+      partitionsToSleep.addAll(partitions);
+    }
+
+    public void addPartition(String partition) {
+      partitionsToSleep.add(partition);
+    }
+
+    @Override
+    public TimeOutStateModel createNewStateModel(String resource, String stateUnitKey) {
+      return new TimeOutStateModel(new SleepTransition(_sleepTime),
+          partitionsToSleep.contains(stateUnitKey));
+    }
+  }
+
+  @Test
+  public void testStateTransitionTimeOut() throws Exception {
+    Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      SleepStateModelFactory factory = new SleepStateModelFactory(1000);
+      factories.put(instanceName, factory);
+      for (String p : idealState.getPartitionSet()) {
+        if (idealState.getPreferenceList(p).get(0).equals(instanceName)) {
+          factory.addPartition(p);
+        }
+      }
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+      _participants[i].syncStart();
+    }
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
+
+    Builder kb = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+    for (String p : idealState.getPartitionSet()) {
+      String idealMaster = idealState.getPreferenceList(p).get(0);
+      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+
+      TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
+      Assert.assertEquals(model._errorCallcount, 1);
+      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java b/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
new file mode 100644
index 0000000..86ab7fb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
@@ -0,0 +1,40 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestStateTransitionProperty {
+
+  @Test
+  public void testTimeoutSetAndGet() {
+    StateTransitionTimeoutConfig stateTransitionTimeoutConfig = new StateTransitionTimeoutConfig(new ZNRecord("TEST"));
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("MASTER", "SLAVE", 300);
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("MASTER", "SLAVE"), 300);
+
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("*", "MASTER", 500);
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("OFFLINE", "MASTER"), 500);
+
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("SLAVE", "OFFLINE"), -1);
+  }
+}