You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/07 22:46:40 UTC

[3/5] [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code duplication, [HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication, rb=24332

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
deleted file mode 100644
index 45f56e5..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * State model factory that uses concrete id classes instead of strings.
- * Replacing {@link org.apache.helix.participant.statemachine.StateModelFactory}
- */
-public abstract class HelixStateModelFactory<T extends StateModel> {
-  /**
-   * map from partitionId to stateModel
-   */
-  private final ConcurrentMap<PartitionId, T> _stateModelMap =
-      new ConcurrentHashMap<PartitionId, T>();
-
-  /**
-   * map from resourceName to BatchMessageWrapper
-   */
-  private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
-      new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
-
-  /**
-   * This method will be invoked only once per partition per session
-   * @param partitionId
-   * @return
-   */
-  public abstract T createNewStateModel(PartitionId partitionId);
-
-  /**
-   * Create a state model for a partition
-   * @param partitionId
-   */
-  public T createAndAddStateModel(PartitionId partitionId) {
-    T stateModel = createNewStateModel(partitionId);
-    _stateModelMap.put(partitionId, stateModel);
-    return stateModel;
-  }
-
-  /**
-   * Get the state model for a partition
-   * @param partitionId
-   * @return state model if exists, null otherwise
-   */
-  public T getStateModel(PartitionId partitionId) {
-    return _stateModelMap.get(partitionId);
-  }
-
-  /**
-   * remove state model for a partition
-   * @param partitionId
-   * @return state model removed or null if not exist
-   */
-  public T removeStateModel(PartitionId partitionId) {
-    return _stateModelMap.remove(partitionId);
-  }
-
-  /**
-   * get partition set
-   * @return partitionId set
-   */
-  public Set<PartitionId> getPartitionSet() {
-    return _stateModelMap.keySet();
-  }
-
-  /**
-   * create a default batch-message-wrapper for a resource
-   * @param resourceId
-   * @return
-   */
-  public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
-    return new BatchMessageWrapper();
-  }
-
-  /**
-   * create a batch-message-wrapper for a resource and put it into map
-   * @param resourceId
-   * @return
-   */
-  public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
-    BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
-    _batchMsgWrapperMap.put(resourceId, wrapper);
-    return wrapper;
-  }
-
-  /**
-   * get batch-message-wrapper for a resource
-   * @param resourceId
-   * @return
-   */
-  public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
-    return _batchMsgWrapperMap.get(resourceId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
deleted file mode 100644
index 320275d..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.id.PartitionId;
-
-public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
-  final HelixStateModelFactory<T> _factory;
-
-  public HelixStateModelFactoryAdaptor(HelixStateModelFactory<T> factory) {
-    _factory = factory;
-  }
-
-  @Override
-  public T createNewStateModel(String partitionName) {
-    return _factory.createNewStateModel(PartitionId.from(partitionName));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index ca67d42..954fba8 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -24,12 +24,14 @@ import java.util.Map;
 import org.apache.helix.HelixException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
-public class ScheduledTaskStateModel extends StateModel {
+public class ScheduledTaskStateModel extends TransitionHandler {
   static final String DEFAULT_INITIAL_STATE = "OFFLINE";
   Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
 
@@ -37,27 +39,27 @@ public class ScheduledTaskStateModel extends StateModel {
   // StateModel with initial state other than OFFLINE should override this field
   protected String _currentState = DEFAULT_INITIAL_STATE;
   final ScheduledTaskStateModelFactory _factory;
-  final String _partitionName;
+  final PartitionId _partition;
 
   final HelixTaskExecutor _executor;
 
   public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
-      HelixTaskExecutor executor, String partitionName) {
+      HelixTaskExecutor executor, PartitionId partition) {
     _factory = factory;
-    _partitionName = partitionName;
+    _partition = partition;
     _executor = executor;
   }
 
   @Transition(to = "COMPLETED", from = "OFFLINE")
   public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeCompletedFromOffline");
+    logger.info(_partition + " onBecomeCompletedFromOffline");
     // System.err.println("\t\t" + _partitionName + " onBecomeCompletedFromOffline");
 
     // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
     Map<String, String> messageInfo =
         message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
-    ZNRecord record = new ZNRecord(_partitionName);
+    ZNRecord record = new ZNRecord(_partition.stringify());
     record.getSimpleFields().putAll(messageInfo);
     Message taskMessage = new Message(record);
     if (logger.isDebugEnabled()) {
@@ -67,49 +69,49 @@ public class ScheduledTaskStateModel extends StateModel {
         _executor.createMessageHandler(taskMessage, new NotificationContext(null));
     if (handler == null) {
       throw new HelixException("Task message " + taskMessage.getMsgType()
-          + " handler not found, task id " + _partitionName);
+          + " handler not found, task id " + _partition);
     }
     // Invoke the internal handler to complete the task
     handler.handleMessage();
-    logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+    logger.info(_partition + " onBecomeCompletedFromOffline completed");
   }
 
   @Transition(to = "OFFLINE", from = "COMPLETED")
   public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
-    logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+    logger.info(_partition + " onBecomeOfflineFromCompleted");
   }
 
   @Transition(to = "DROPPED", from = "COMPLETED")
   public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
-    logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+    logger.info(_partition + " onBecomeDroppedFromCompleted");
     removeFromStatemodelFactory();
   }
 
   @Transition(to = "DROPPED", from = "OFFLINE")
   public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+    logger.info(_partition + " onBecomeDroppedFromScheduled");
     removeFromStatemodelFactory();
   }
 
   @Transition(to = "OFFLINE", from = "ERROR")
   public void onBecomeOfflineFromError(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeOfflineFromError");
+    logger.info(_partition + " onBecomeOfflineFromError");
   }
 
   @Override
   public void reset() {
-    logger.info(_partitionName + " ScheduledTask reset");
+    logger.info(_partition + " ScheduledTask reset");
     removeFromStatemodelFactory();
   }
 
   // We need this to prevent state model leak
   private void removeFromStatemodelFactory() {
-    if (_factory.getStateModel(_partitionName) != null) {
-      _factory.removeStateModel(_partitionName);
+    if (_factory.getTransitionHandler(_partition) != null) {
+      _factory.removeTransitionHandler(_partition);
     } else {
-      logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+      logger.warn(_partition + " not found in ScheduledTaskStateModelFactory");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
index a205910..e2bc461 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -19,10 +19,12 @@ package org.apache.helix.participant.statemachine;
  * under the License.
  */
 
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.log4j.Logger;
 
-public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledTaskStateModel> {
+public class ScheduledTaskStateModelFactory extends StateTransitionHandlerFactory<ScheduledTaskStateModel> {
   Logger logger = Logger.getLogger(ScheduledTaskStateModelFactory.class);
 
   HelixTaskExecutor _executor;
@@ -32,8 +34,8 @@ public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledT
   }
 
   @Override
-  public ScheduledTaskStateModel createNewStateModel(String partitionName) {
-    logger.info("Create state model for ScheduledTask " + partitionName);
-    return new ScheduledTaskStateModel(this, _executor, partitionName);
+  public ScheduledTaskStateModel createStateTransitionHandler(PartitionId partition) {
+    logger.info("Create state model for ScheduledTask " + partition);
+    return new ScheduledTaskStateModel(this, _executor, partition);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
deleted file mode 100644
index 9717340..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public abstract class StateModel {
-  static final String DEFAULT_INITIAL_STATE = "OFFLINE";
-  Logger logger = Logger.getLogger(StateModel.class);
-
-  // TODO Get default state from implementation or from state model annotation
-  // StateModel with initial state other than OFFLINE should override this field
-  protected String _currentState = DEFAULT_INITIAL_STATE;
-
-  /**
-   * requested-state is used (e.g. by task-framework) to request next state
-   */
-  protected String _requestedState = null;
-
-  public String getCurrentState() {
-    return _currentState;
-  }
-
-  // @transition(from='from', to='to')
-  public void defaultTransitionHandler() {
-    logger
-        .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
-  }
-
-  public boolean updateState(String newState) {
-    _currentState = newState;
-    return true;
-  }
-
-  /**
-   * Get requested-state
-   * @return requested-state
-   */
-  public String getRequestedState() {
-    return _requestedState;
-  }
-
-  /**
-   * Set requested-state
-   * @param requestedState
-   */
-  public void setRequestedState(String requestedState) {
-    _requestedState = requestedState;
-  }
-
-  /**
-   * Called when error occurs in state transition
-   * TODO:enforce subclass to write this
-   * @param message
-   * @param context
-   * @param error
-   */
-  public void rollbackOnError(Message message, NotificationContext context,
-      StateTransitionError error) {
-
-    logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
-
-  }
-
-  /**
-   * Called when the state model is reset
-   */
-  public void reset() {
-    logger
-        .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
-  }
-
-  /**
-   * default transition for drop partition in error state
-   * @param message
-   * @param context
-   * @throws InterruptedException
-   */
-  @Transition(to = "DROPPED", from = "ERROR")
-  public void onBecomeDroppedFromError(Message message, NotificationContext context)
-      throws Exception {
-    logger.info("Default ERROR->DROPPED transition invoked.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
deleted file mode 100644
index a74f67b..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * Replaced by {@link org.apache.helix.participant.statemachine.HelixStateModelFactory}
- */
-@Deprecated
-public abstract class StateModelFactory<T extends StateModel> {
-  /**
-   * mapping from partitionName to StateModel
-   */
-  private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
-
-  /**
-   * mapping from resourceName to BatchMessageWrapper
-   */
-  private final ConcurrentMap<String, BatchMessageWrapper> _batchMsgWrapperMap =
-      new ConcurrentHashMap<String, BatchMessageWrapper>();
-
-  /**
-   * This method will be invoked only once per partitionName per session
-   * @param partitionName
-   * @return
-   */
-  public abstract T createNewStateModel(String partitionName);
-
-  /**
-   * Create a state model for a partition
-   * @param partitionName
-   */
-  public T createAndAddStateModel(String partitionName) {
-    T stateModel = createNewStateModel(partitionName);
-    _stateModelMap.put(partitionName, stateModel);
-    return stateModel;
-  }
-
-  /**
-   * Get the state model for a partition
-   * @param partitionName
-   * @return state model if exists, null otherwise
-   */
-  public T getStateModel(String partitionName) {
-    return _stateModelMap.get(partitionName);
-  }
-
-  /**
-   * remove state model for a partition
-   * @param partitionName
-   * @return state model removed or null if not exist
-   */
-  public T removeStateModel(String partitionName) {
-    return _stateModelMap.remove(partitionName);
-  }
-
-  /**
-   * get partition set
-   * @return partition key set
-   */
-  public Set<String> getPartitionSet() {
-    return _stateModelMap.keySet();
-  }
-
-  /**
-   * create a default batch-message-wrapper for a resource
-   * @param resourceName
-   * @return
-   */
-  public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
-    return new BatchMessageWrapper();
-  }
-
-  /**
-   * create a batch-message-wrapper for a resource and put it into map
-   * @param resourceName
-   * @return
-   */
-  public BatchMessageWrapper createAndAddBatchMessageWrapper(String resourceName) {
-    BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceName);
-    _batchMsgWrapperMap.put(resourceName, wrapper);
-    return wrapper;
-  }
-
-  /**
-   * get batch-message-wrapper for a resource
-   * @param resourceName
-   * @return
-   */
-  public BatchMessageWrapper getBatchMessageWrapper(String resourceName) {
-    return _batchMsgWrapperMap.get(resourceName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
index eddeaa5..d84b09a 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
 import java.util.Arrays;
 
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.model.Message;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.helix.model.Message;
  */
 public class StateModelParser {
 
-  public Method getMethodForTransition(Class<? extends StateModel> clazz, String fromState,
+  public Method getMethodForTransition(Class<? extends TransitionHandler> clazz, String fromState,
       String toState, Class<?>[] paramTypes) {
     Method method = getMethodForTransitionUsingAnnotation(clazz, fromState, toState, paramTypes);
     if (method == null) {
@@ -48,7 +49,7 @@ public class StateModelParser {
    * @param paramTypes
    * @return Method if found else null
    */
-  public Method getMethodForTransitionByConvention(Class<? extends StateModel> clazz,
+  public Method getMethodForTransitionByConvention(Class<? extends TransitionHandler> clazz,
       String fromState, String toState, Class<?>[] paramTypes) {
     Method methodToInvoke = null;
     String methodName = "onBecome" + toState + "From" + fromState;
@@ -82,7 +83,7 @@ public class StateModelParser {
    * @param paramTypes
    * @return
    */
-  public Method getMethodForTransitionUsingAnnotation(Class<? extends StateModel> clazz,
+  public Method getMethodForTransitionUsingAnnotation(Class<? extends TransitionHandler> clazz,
       String fromState, String toState, Class<?>[] paramTypes) {
     StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
     Method methodToInvoke = null;
@@ -114,12 +115,12 @@ public class StateModelParser {
    * @param clazz
    * @return
    */
-  public String getInitialState(Class<? extends StateModel> clazz) {
+  public String getInitialState(Class<? extends TransitionHandler> clazz) {
     StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
     if (stateModelInfo != null) {
       return stateModelInfo.initialState();
     } else {
-      return StateModel.DEFAULT_INITIAL_STATE;
+      return TransitionHandler.DEFAULT_INITIAL_STATE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 66abba6..45ee71c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,7 +20,7 @@ package org.apache.helix.task;
  */
 
 import org.apache.helix.HelixManager;
-import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.task.TaskResult.Status;
 import org.apache.log4j.Logger;
 
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
  */
 public class TaskRunner implements Runnable {
   private static final Logger LOG = Logger.getLogger(TaskRunner.class);
-  private final StateModel _taskStateModel;
+  private final TransitionHandler _taskStateModel;
   private final HelixManager _manager;
   private final String _taskName;
   private final String _taskPartition;
@@ -50,7 +50,7 @@ public class TaskRunner implements Runnable {
   // If true, indicates that the task has finished.
   private volatile boolean _done = false;
 
-  public TaskRunner(StateModel taskStateModel, Task task, String taskName, String taskPartition,
+  public TaskRunner(TransitionHandler taskStateModel, Task task, String taskName, String taskPartition,
       String instance, HelixManager manager, String sessionId) {
     _taskStateModel = taskStateModel;
     _task = task;

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a44a8cb..69a1b88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -28,14 +28,14 @@ import java.util.concurrent.ThreadFactory;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.log4j.Logger;
 
 @StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
-public class TaskStateModel extends StateModel {
+public class TaskStateModel extends TransitionHandler {
   private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
   private final HelixManager _manager;
   private final ExecutorService _taskExecutor;

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 2537747..8fb5690 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -22,13 +22,13 @@ package org.apache.helix.task;
 import java.util.Map;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
 
 /**
  * Factory class for {@link TaskStateModel}.
  */
-public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel> {
+public class TaskStateModelFactory extends StateTransitionHandlerFactory<TaskStateModel> {
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
 
@@ -38,7 +38,7 @@ public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel
   }
 
   @Override
-  public TaskStateModel createNewStateModel(PartitionId partitionId) {
+  public TaskStateModel createStateTransitionHandler(PartitionId partitionId) {
     return new TaskStateModel(_manager, _taskFactoryRegistry);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 6599b33..7402ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -400,7 +400,7 @@ public class ClusterSetup {
       return;
     }
     try {
-      int replica = Integer.parseInt(idealState.getReplicas());
+      Integer.parseInt(idealState.getReplicas());
     } catch (Exception e) {
       _logger.error("", e);
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index f51aa1d..8e80f15 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -19,6 +19,7 @@ package org.apache.helix;
  * under the License.
  */
 
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory;
@@ -43,14 +44,14 @@ public class DummyProcessThread implements Runnable {
       // StateMachineEngine genericStateMachineHandler =
       // new StateMachineEngine();
       StateMachineEngine stateMach = _manager.getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+      stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, stateModelFactory);
 
       DummyLeaderStandbyStateModelFactory stateModelFactory1 =
           new DummyLeaderStandbyStateModelFactory(10);
       DummyOnlineOfflineStateModelFactory stateModelFactory2 =
           new DummyOnlineOfflineStateModelFactory(10);
-      stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
-      stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+      stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory1);
+      stateMach.registerStateModelFactory(StateModelDefId.OnlineOffline, stateModelFactory2);
       // _manager.getMessagingService()
       // .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
       // genericStateMachineHandler);

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 0303f12..e6b64fc 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -39,7 +40,6 @@ import org.apache.helix.messaging.handling.MessageTask;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -189,7 +189,7 @@ public class Mocks {
 
   }
 
-  public static class MockStateModel extends StateModel {
+  public static class MockStateModel extends TransitionHandler {
     boolean stateModelInvoked = false;
 
     public void onBecomeMasterFromSlave(Message msg, NotificationContext context) {
@@ -202,7 +202,7 @@ public class Mocks {
   }
 
   @StateModelInfo(states = "{'OFFLINE','SLAVE','MASTER'}", initialState = "OFFINE")
-  public static class MockStateModelAnnotated extends StateModel {
+  public static class MockStateModelAnnotated extends TransitionHandler {
     boolean stateModelInvoked = false;
 
     @Transition(from = "SLAVE", to = "MASTER")

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index a3b16e5..d16417e 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.helix.Mocks.MockManager;
 import org.apache.helix.Mocks.MockStateModel;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
@@ -36,7 +37,6 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
@@ -75,10 +75,10 @@ public class TestHelixTaskExecutor {
     CurrentState currentStateDelta = new CurrentState("TestDB");
     currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
 
-    StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {
+    StateTransitionHandlerFactory<MockStateModel> stateModelFactory = new StateTransitionHandlerFactory<MockStateModel>() {
 
       @Override
-      public MockStateModel createNewStateModel(String partitionName) {
+      public MockStateModel createStateTransitionHandler(PartitionId partition) {
         // TODO Auto-generated method stub
         return new MockStateModel();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 43b4407..21a3ae7 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -26,6 +26,7 @@ import org.apache.helix.Mocks.MockStateModel;
 import org.apache.helix.Mocks.MockStateModelAnnotated;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
@@ -38,7 +39,6 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
@@ -115,11 +115,11 @@ public class TestHelixTaskHandler {
     CurrentState currentStateDelta = new CurrentState("TestDB");
     currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
 
-    StateModelFactory<MockStateModelAnnotated> stateModelFactory =
-        new StateModelFactory<MockStateModelAnnotated>() {
+    StateTransitionHandlerFactory<MockStateModelAnnotated> stateModelFactory =
+        new StateTransitionHandlerFactory<MockStateModelAnnotated>() {
 
           @Override
-          public MockStateModelAnnotated createNewStateModel(String partitionName) {
+          public MockStateModelAnnotated createStateTransitionHandler(PartitionId partitionName) {
             // TODO Auto-generated method stub
             return new MockStateModelAnnotated();
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 879e727..4a9139f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,8 +37,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.commons.io.FileUtils;
@@ -48,7 +46,6 @@ import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -772,30 +769,4 @@ public class TestHelper {
     System.out.println(sb.toString());
   }
 
-  public static void printZkListeners(ZkClient client) throws Exception {
-    Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
-    Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
-    System.out.println("dataListeners {");
-    for (String path : datalisteners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkDataListener> set = datalisteners.get(path);
-      for (IZkDataListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-
-    System.out.println("childListeners {");
-    for (String path : childListeners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkChildListener> set = childListeners.get(path);
-      for (IZkChildListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 5f37845..93d168b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
@@ -111,8 +112,8 @@ public class TestAddStateModelFactoryAfterConnect extends ZkTestBase {
     // register "TestDB1_Factory" state model factory
     // Logger.getRootLogger().setLevel(Level.INFO);
     for (int i = 0; i < n; i++) {
-      participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
-          new MockMSModelFactory(), "TestDB1_Factory");
+      participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
+          "TestDB1_Factory", new MockMSModelFactory());
     }
 
     result =

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 6a6837a..789ae70 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -24,6 +24,8 @@ import java.util.Date;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -57,7 +59,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
 
   class TestMockMSModelFactory extends MockMSModelFactory {
     @Override
-    public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
+    public BatchMessageWrapper createBatchMessageWrapper(ResourceId resource) {
       return new MockBatchMsgWrapper();
     }
   }
@@ -100,7 +102,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
       String instanceName = "localhost_" + (12918 + i);
       ftys[i] = new TestMockMSModelFactory();
       participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
-      participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
+      participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave, ftys[i]);
       participants[i].syncStart();
 
       // wait for each participant to complete state transitions, so we have deterministic results
@@ -117,7 +119,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
     }
 
     // check batch-msg-wrapper counts
-    MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper("TestDB0");
+    MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper(ResourceId.from("TestDB0"));
     // System.out.println("startCount: " + wrapper._startCount);
     Assert.assertEquals(wrapper._startCount, 3,
         "Expect 3 batch.start: O->S, S->M, and M->S for 1st participant");
@@ -125,7 +127,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
     Assert.assertEquals(wrapper._endCount, 3,
         "Expect 3 batch.end: O->S, S->M, and M->S for 1st participant");
 
-    wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper("TestDB0");
+    wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper(ResourceId.from("TestDB0"));
     // System.out.println("startCount: " + wrapper._startCount);
     Assert.assertEquals(wrapper._startCount, 2,
         "Expect 2 batch.start: O->S and S->M for 2nd participant");

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index abb2a7b..e4e844a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -27,13 +27,13 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.StateTransitionError;
 import org.apache.helix.participant.statemachine.Transition;
@@ -145,7 +145,7 @@ public class TestCorrectnessOnConnectivityLoss {
   @StateModelInfo(initialState = "OFFLINE", states = {
       "MASTER", "SLAVE", "OFFLINE", "ERROR"
   })
-  public static class MyStateModel extends StateModel {
+  public static class MyStateModel extends TransitionHandler {
     private final Map<String, Integer> _counts;
 
     public MyStateModel(Map<String, Integer> counts) {
@@ -189,7 +189,7 @@ public class TestCorrectnessOnConnectivityLoss {
     }
   }
 
-  public static class MyStateModelFactory extends HelixStateModelFactory<MyStateModel> {
+  public static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
 
     private final Map<String, Integer> _counts;
 
@@ -198,7 +198,7 @@ public class TestCorrectnessOnConnectivityLoss {
     }
 
     @Override
-    public MyStateModel createNewStateModel(PartitionId partitionId) {
+    public MyStateModel createStateTransitionHandler(PartitionId partitionId) {
       return new MyStateModel(_counts);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
deleted file mode 100644
index c2f9a5c..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
-import org.apache.helix.manager.zk.MockMultiClusterController;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestDistributedClusterController extends ZkTestBase {
-
-  @Test
-  public void testDistributedClusterController() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterNamePrefix = className + "_" + methodName;
-    final int n = 5;
-    final int clusterNb = 10;
-
-    System.out
-        .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
-    // setup 10 clusters
-    for (int i = 0; i < clusterNb; i++) {
-      String clusterName = clusterNamePrefix + "0_" + i;
-      String participantName = "localhost" + i;
-      String resourceName = "TestDB" + i;
-      TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
-          participantName, // participant name prefix
-          resourceName, // resource name prefix
-          1, // resources
-          8, // partitions per resource
-          n, // number of nodes
-          3, // replicas
-          "MasterSlave", true); // do rebalance
-    }
-
-    // setup controller cluster
-    final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
-    TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
-                                                                           // port
-        "controller", // participant name prefix
-        clusterNamePrefix, // resource name prefix
-        1, // resources
-        clusterNb, // partitions per resource
-        n, // number of nodes
-        3, // replicas
-        "LeaderStandby", true); // do rebalance
-
-    // start distributed cluster controllers
-    MockMultiClusterController[] controllers = new MockMultiClusterController[n];
-    for (int i = 0; i < n; i++) {
-      controllers[i] =
-          new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
-      controllers[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(
-            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
-            30000);
-    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
-
-    // start first cluster
-    MockParticipant[] participants = new MockParticipant[n];
-    final String firstClusterName = clusterNamePrefix + "0_0";
-    for (int i = 0; i < n; i++) {
-      String instanceName = "localhost0_" + (12918 + i);
-      participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
-      participants[i].syncStart();
-    }
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
-            firstClusterName));
-    Assert.assertTrue(result, "first cluster NOT in ideal state");
-
-    // stop current leader in controller cluster
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
-    Builder keyBuilder = accessor.keyBuilder();
-    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
-    String leaderName = leader.getId();
-    int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
-    controllers[j].syncStop();
-
-    // setup the second cluster
-    MockParticipant[] participants2 = new MockParticipant[n];
-    final String secondClusterName = clusterNamePrefix + "0_1";
-    for (int i = 0; i < n; i++) {
-      String instanceName = "localhost1_" + (12918 + i);
-      participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
-      participants2[i].syncStart();
-    }
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
-            secondClusterName));
-    Assert.assertTrue(result, "second cluster NOT in ideal state");
-
-    // clean up
-    // wait for all zk callbacks done
-    System.out.println("Cleaning up...");
-    for (int i = 0; i < 5; i++) {
-      result =
-          ClusterStateVerifier
-              .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
-                  controllerClusterName));
-      controllers[i].syncStop();
-    }
-
-    for (int i = 0; i < 5; i++) {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
index 19af9a7..648fc4c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -56,11 +56,8 @@ public class TestErrorPartition extends ZkTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-          {
-            put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-          }
-        };
+        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+        errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
         participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
         participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
@@ -77,11 +74,8 @@ public class TestErrorPartition extends ZkTestBase {
             _zkaddr, clusterName, errStates));
     Assert.assertTrue(result);
 
-    Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>() {
-      {
-        put("TestDB0_4", TestHelper.setOf("localhost_12918"));
-      }
-    };
+    Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
+    errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12918"));
 
     // verify "TestDB0_0", "localhost_12918" is in ERROR state
     TestHelper.verifyState(clusterName, _zkaddr, errorStateMap, "ERROR");

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 3d02ae8..484ae8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -30,6 +30,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.config.ClusterConfig;
 import org.apache.helix.api.config.ParticipantConfig;
@@ -46,8 +48,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.AutoModeISBuilder;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.testutil.ZkTestBase;
@@ -61,7 +61,7 @@ public class TestHelixConnection extends ZkTestBase {
   @StateModelInfo(initialState = "OFFLINE", states = {
       "MASTER", "SLAVE", "OFFLINE", "ERROR"
   })
-  public static class MockStateModel extends StateModel {
+  public static class MockStateModel extends TransitionHandler {
     public MockStateModel() {
 
     }
@@ -74,13 +74,13 @@ public class TestHelixConnection extends ZkTestBase {
     }
   }
 
-  public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+  public static class MockStateModelFactory extends StateTransitionHandlerFactory<MockStateModel> {
 
     public MockStateModelFactory() {
     }
 
     @Override
-    public MockStateModel createNewStateModel(PartitionId partitionId) {
+    public MockStateModel createStateTransitionHandler(PartitionId partitionId) {
       MockStateModel model = new MockStateModel();
 
       return model;

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index 496a16f..95673d4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -39,7 +39,10 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -54,8 +57,6 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.testutil.ZkTestBase;
@@ -245,8 +246,6 @@ public class TestMessageThrottle2 extends ZkTestBase {
     }
 
     public void start() throws Exception {
-//      helixManager =
-//          new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, _zkaddr);
       participant = new MockParticipant(_zkaddr, clusterName, instanceName);
       {
         // hack to set sessionTimeout
@@ -256,7 +255,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
       }
 
       StateMachineEngine stateMach = participant.getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", new MyStateModelFactory(participant));
+      stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, new MyStateModelFactory(participant));
       participant.connect();
 
       StatusPrinter statusPrinter = new StatusPrinter();
@@ -271,13 +270,11 @@ public class TestMessageThrottle2 extends ZkTestBase {
   @StateModelInfo(initialState = "OFFLINE", states = {
       "MASTER", "SLAVE", "ERROR"
   })
-  public static class MyStateModel extends StateModel {
+  public static class MyStateModel extends TransitionHandler {
     private static final Logger LOGGER = Logger.getLogger(MyStateModel.class);
 
-    private final HelixManager helixManager;
 
     public MyStateModel(HelixManager helixManager) {
-      this.helixManager = helixManager;
     }
 
     @Transition(to = "SLAVE", from = "OFFLINE")
@@ -323,7 +320,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
     }
   }
 
-  static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+  static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
     private final HelixManager helixManager;
 
     public MyStateModelFactory(HelixManager helixManager) {
@@ -331,7 +328,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
     }
 
     @Override
-    public MyStateModel createNewStateModel(String partitionName) {
+    public MyStateModel createStateTransitionHandler(PartitionId partitionName) {
       return new MyStateModel(helixManager);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 08954e5..eb2bec1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -267,8 +267,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     cr.setSessionSpecific(false);
 
     AsyncCallback asyncCallback = new MockAsyncCallback();
-    int messagesSent =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
 
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -276,7 +275,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
 
     AsyncCallback asyncCallback2 = new MockAsyncCallback();
-    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
     AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
 
   }
@@ -287,7 +286,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
 
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-      String hostDest = "localhost_" + (START_PORT + i);
       _participants[0].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
 
@@ -307,8 +305,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
     cr.setSessionSpecific(false);
     AsyncCallback callback1 = new MockAsyncCallback();
-    int messageSent1 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -316,32 +313,28 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
 
     AsyncCallback callback2 = new MockAsyncCallback();
-    int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
 
     AssertJUnit.assertTrue(callback2.isTimedOut());
 
     cr.setPartition("TestDB_17");
     AsyncCallback callback3 = new MockAsyncCallback();
-    int messageSent3 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
 
     cr.setPartition("TestDB_15");
     AsyncCallback callback4 = new MockAsyncCallback();
-    int messageSent4 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
     AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
 
     cr.setPartitionState("SLAVE");
     AsyncCallback callback5 = new MockAsyncCallback();
-    int messageSent5 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
     AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
 
     cr.setDataSource(DataSource.IDEALSTATES);
     AsyncCallback callback6 = new MockAsyncCallback();
-    int messageSent6 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
     AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
   }
 
@@ -351,7 +344,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
 
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-      String hostDest = "localhost_" + (START_PORT + i);
       _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
 
@@ -372,8 +364,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     cr.setSessionSpecific(false);
     cr.setSelfExcluded(false);
     AsyncCallback callback1 = new MockAsyncCallback();
-    int messageSent1 =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -387,7 +378,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
 
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-      String hostDest = "localhost_" + (START_PORT + i);
       _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
 
@@ -408,8 +398,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     cr.setSessionSpecific(false);
 
     AsyncCallback callback1 = new MockAsyncCallback();
-    int messagesSent =
-        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -420,7 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     msg.setMessageId(msgId);
     cr.setPartition("TestDB_17");
     AsyncCallback callback2 = new MockAsyncCallback();
-    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
 
     AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -432,7 +421,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     msg.setMessageId(msgId);
     cr.setPartitionState("SLAVE");
     AsyncCallback callback3 = new MockAsyncCallback();
-    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+    _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
         .indexOf(hostSrc) != -1);

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
new file mode 100644
index 0000000..8945843
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
@@ -0,0 +1,144 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestMultiClusterController extends ZkTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterNamePrefix = className + "_" + methodName;
+    final int n = 5;
+    final int clusterNb = 10;
+
+    System.out
+        .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+    // setup 10 clusters
+    for (int i = 0; i < clusterNb; i++) {
+      String clusterName = clusterNamePrefix + "0_" + i;
+      String participantName = "localhost" + i;
+      String resourceName = "TestDB" + i;
+      TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+          participantName, // participant name prefix
+          resourceName, // resource name prefix
+          1, // resources
+          8, // partitions per resource
+          n, // number of nodes
+          3, // replicas
+          "MasterSlave", true); // do rebalance
+    }
+
+    // setup controller cluster
+    final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+    TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
+                                                                           // port
+        "controller", // participant name prefix
+        clusterNamePrefix, // resource name prefix
+        1, // resources
+        clusterNb, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "LeaderStandby", true); // do rebalance
+
+    // start distributed cluster controllers
+    MockMultiClusterController[] controllers = new MockMultiClusterController[n];
+    for (int i = 0; i < n; i++) {
+      controllers[i] =
+          new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
+      controllers[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(
+            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
+            30000);
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+    // start first cluster
+    MockParticipant[] participants = new MockParticipant[n];
+    final String firstClusterName = clusterNamePrefix + "0_0";
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost0_" + (12918 + i);
+      participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+            firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+    // stop current leader in controller cluster
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    String leaderName = leader.getId();
+    int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+    controllers[j].syncStop();
+
+    // setup the second cluster
+    MockParticipant[] participants2 = new MockParticipant[n];
+    final String secondClusterName = clusterNamePrefix + "0_1";
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost1_" + (12918 + i);
+      participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
+      participants2[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+            secondClusterName));
+    Assert.assertTrue(result, "second cluster NOT in ideal state");
+
+    // clean up
+    // wait for all zk callbacks done
+    System.out.println("Cleaning up...");
+    for (int i = 0; i < 5; i++) {
+      result =
+          ClusterStateVerifier
+              .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
+                  controllerClusterName));
+      controllers[i].syncStop();
+    }
+
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 105633a..d5f5458 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration;
 import java.util.Date;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.mock.participant.MockBootstrapModelFactory;
@@ -65,7 +66,7 @@ public class TestNonOfflineInitState extends ZkTestBase {
       // add a state model with non-OFFLINE initial state
       StateMachineEngine stateMach = participants[i].getStateMachineEngine();
       MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
-      stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+      stateMach.registerStateModelFactory(StateModelDefId.from("Bootstrap"), bootstrapFactory);
 
       participants[i].syncStart();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index 823a9ce..0962921 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -28,6 +28,10 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -38,8 +42,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -52,7 +54,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
 
   final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
 
-  public class BootstrapStateModel extends StateModel {
+  public class BootstrapStateModel extends TransitionHandler {
     public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
       LOG.info("Become Bootstrap from Offline");
       _msgOrderList.add(message);
@@ -85,10 +87,10 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
 
   }
 
-  public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
+  public class BootstrapStateModelFactory extends StateTransitionHandlerFactory<BootstrapStateModel> {
 
     @Override
-    public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+    public BootstrapStateModel createStateTransitionHandler(PartitionId partition) {
       BootstrapStateModel model = new BootstrapStateModel();
       return model;
     }
@@ -145,7 +147,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
     String instanceName1 = "localhost_12918";
 
     participants[0] = new MockParticipant(_zkaddr, clusterName, instanceName1);
-    participants[0].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+    participants[0].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
         new BootstrapStateModelFactory());
     participants[0].syncStart();
 
@@ -158,7 +160,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
     // start 2nd participant which will be the master for Test0_0
     String instanceName2 = "localhost_12919";
     participants[1] = new MockParticipant(_zkaddr, clusterName, instanceName2);
-    participants[1].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+    participants[1].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
         new BootstrapStateModelFactory());
     participants[1].syncStart();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 06a2b56..7eb054d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -33,6 +33,10 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.IdealState;
@@ -41,8 +45,6 @@ import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -152,7 +154,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
       participants[i] =
           HelixManagerFactory.getZKHelixManager(_clusterName, instanceInfoArray[i],
               InstanceType.PARTICIPANT, _zkaddr);
-      participants[i].getStateMachineEngine().registerStateModelFactory(_stateModel,
+      participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(_stateModel),
           new PrefListTaskOnlineOfflineStateModelFactory());
       participants[i].connect();
     }
@@ -419,7 +421,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
    * should be run, and then the instance should be removed from the preference list for the task
    * (padded by spaces). All other transitions are no-ops.
    */
-  public class PrefListTaskOnlineOfflineStateModel extends StateModel {
+  public class PrefListTaskOnlineOfflineStateModel extends TransitionHandler {
     /**
      * Run the task. The parallelism of this is dictated by the constraints that are set.
      * @param message
@@ -477,9 +479,9 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
   }
 
   public class PrefListTaskOnlineOfflineStateModelFactory extends
-      StateModelFactory<PrefListTaskOnlineOfflineStateModel> {
+      StateTransitionHandlerFactory<PrefListTaskOnlineOfflineStateModel> {
     @Override
-    public PrefListTaskOnlineOfflineStateModel createNewStateModel(String partitionName) {
+    public PrefListTaskOnlineOfflineStateModel createStateTransitionHandler(PartitionId partitionName) {
       return new PrefListTaskOnlineOfflineStateModel();
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 5804744..f3fe6f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -59,12 +59,9 @@ public class TestResetInstance extends ZkTestBase {
         new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+    errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
     MockParticipant[] participants = new MockParticipant[n];

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 4855b3d..c7386b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -84,12 +84,9 @@ public class TestResetPartitionState extends ZkTestBase {
         new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+    errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
     MockParticipant[] participants = new MockParticipant[n];

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index 7d28931..92f1c43 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -58,12 +58,9 @@ public class TestResetResource extends ZkTestBase {
         new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+    errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
     MockParticipant[] participants = new MockParticipant[n];

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 89af602..8accc75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -115,7 +115,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
       public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         result.setSuccess(true);
-        String destName = _message.getTgtName();
         String partitionName = _message.getPartitionName();
         result.getTaskResultMap().put("Message", _message.getMsgId());
         synchronized (_results) {