You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/07 22:46:41 UTC

[4/5] git commit: [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code duplication, [HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication, rb=24332

[HELIX-484] Remove CallbackHandler/ZkCallbackHandler code duplication, [HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication, rb=24332


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4ae1ff79
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4ae1ff79
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4ae1ff79

Branch: refs/heads/master
Commit: 4ae1ff796a524766b22f5fc184952187dc9cc24b
Parents: dd8226b
Author: zzhang <zz...@apache.org>
Authored: Thu Aug 7 13:45:50 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Aug 7 13:45:50 2014 -0700

----------------------------------------------------------------------
 .../apache/helix/webapp/TestResetInstance.java  |   9 +-
 .../apache/helix/webapp/TestResetResource.java  |   9 +-
 .../org/apache/helix/agent/AgentStateModel.java |   4 +-
 .../helix/agent/AgentStateModelFactory.java     |   6 +-
 .../api/StateTransitionHandlerFactory.java      | 114 +++++
 .../org/apache/helix/api/TransitionHandler.java | 106 +++++
 .../apache/helix/api/id/StateModelDefId.java    |   4 +
 .../helix/controller/HelixControllerMain.java   |  10 +-
 .../helix/manager/zk/CallbackHandler.java       | 420 -------------------
 .../manager/zk/ParticipantManagerHelper.java    | 281 -------------
 .../helix/manager/zk/ZkHelixParticipant.java    |   5 +-
 .../handling/HelixStateTransitionHandler.java   |  14 +-
 .../helix/participant/CustomCodeInvoker.java    |   9 +-
 .../DistClusterControllerStateModel.java        |   4 +-
 .../DistClusterControllerStateModelFactory.java |   8 +-
 .../participant/GenericLeaderStandbyModel.java  |   7 +-
 .../GenericLeaderStandbyStateModelFactory.java  |   9 +-
 .../participant/HelixCustomCodeRunner.java      |   8 +-
 .../participant/HelixStateMachineEngine.java    | 121 ++----
 .../helix/participant/StateMachineEngine.java   |  38 +-
 .../statemachine/HelixStateModelFactory.java    | 118 ------
 .../HelixStateModelFactoryAdaptor.java          |  36 --
 .../statemachine/ScheduledTaskStateModel.java   |  34 +-
 .../ScheduledTaskStateModelFactory.java         |  10 +-
 .../participant/statemachine/StateModel.java    | 104 -----
 .../statemachine/StateModelFactory.java         | 116 -----
 .../statemachine/StateModelParser.java          |  11 +-
 .../java/org/apache/helix/task/TaskRunner.java  |   6 +-
 .../org/apache/helix/task/TaskStateModel.java   |   4 +-
 .../helix/task/TaskStateModelFactory.java       |   6 +-
 .../org/apache/helix/tools/ClusterSetup.java    |   2 +-
 .../org/apache/helix/DummyProcessThread.java    |   7 +-
 .../src/test/java/org/apache/helix/Mocks.java   |   6 +-
 .../org/apache/helix/TestHelixTaskExecutor.java |   6 +-
 .../org/apache/helix/TestHelixTaskHandler.java  |   8 +-
 .../test/java/org/apache/helix/TestHelper.java  |  29 --
 .../TestAddStateModelFactoryAfterConnect.java   |   5 +-
 .../integration/TestBatchMessageWrapper.java    |  10 +-
 .../TestCorrectnessOnConnectivityLoss.java      |  10 +-
 .../TestDistributedClusterController.java       | 144 -------
 .../helix/integration/TestErrorPartition.java   |  14 +-
 .../helix/integration/TestHelixConnection.java  |  10 +-
 .../helix/integration/TestMessageThrottle2.java |  17 +-
 .../helix/integration/TestMessagingService.java |  35 +-
 .../integration/TestMultiClusterController.java | 144 +++++++
 .../integration/TestNonOfflineInitState.java    |   3 +-
 .../TestPartitionLevelTransitionConstraint.java |  16 +-
 .../integration/TestPreferenceListAsQueue.java  |  14 +-
 .../helix/integration/TestResetInstance.java    |   9 +-
 .../integration/TestResetPartitionState.java    |   9 +-
 .../helix/integration/TestResetResource.java    |   9 +-
 .../helix/integration/TestSchedulerMessage.java |   1 -
 .../integration/TestStateTransitionTimeout.java |  21 +-
 .../helix/integration/TestZkReconnect.java      |  15 +-
 .../manager/TestConsecutiveZkSessionExpiry.java |   3 +-
 .../TestDistributedControllerManager.java       | 203 ---------
 .../TestHelixMultiClusterController.java        | 204 +++++++++
 .../manager/TestParticipantManager.java         |   3 +-
 .../integration/manager/TestStateModelLeak.java |  48 ++-
 .../integration/manager/ZkTestManager.java      |  35 --
 .../manager/zk/MockMultiClusterController.java  |   3 +-
 .../helix/manager/zk/MockParticipant.java       |   9 +-
 .../TestDefaultControllerMsgHandlerFactory.java |   8 +-
 .../helix/messaging/TestAsyncCallbackSvc.java   |  13 +-
 .../helix/mock/participant/DummyProcess.java    |  37 +-
 .../participant/MockBootstrapModelFactory.java  |   8 +-
 .../participant/MockBootstrapStateModel.java    |   4 +-
 .../mock/participant/MockMSModelFactory.java    |  12 +-
 .../mock/participant/MockMSStateModel.java      |   4 +-
 .../participant/MockSchemataModelFactory.java   |  11 +-
 .../participant/MockSchemataStateModel.java     |   4 +-
 .../org/apache/helix/model/TestConstraint.java  |   2 +-
 .../helix/participant/MockZKHelixManager.java   |   2 +-
 .../TestDistControllerStateModelFactory.java    |   3 +-
 .../statemachine/TestStateModelParser.java      |   5 +-
 .../apache/helix/examples/BootstrapHandler.java |  17 +-
 .../apache/helix/examples/BootstrapProcess.java |  17 +-
 .../apache/helix/examples/DummyParticipant.java |  13 +-
 .../apache/helix/examples/ExampleProcess.java   |  12 +-
 .../LeaderStandbyStateModelFactory.java         |  11 +-
 .../helix/examples/LogicalModelExample.java     |  10 +-
 .../examples/MasterSlaveStateModelFactory.java  |  13 +-
 .../OnlineOfflineStateModelFactory.java         |  11 +-
 .../org/apache/helix/examples/Quickstart.java   |   2 +-
 .../participant/StatelessServiceStateModel.java |   4 +-
 .../StatelessServiceStateModelFactory.java      |   6 +-
 .../java/org/apache/helix/lockmanager/Lock.java |   4 +-
 .../apache/helix/lockmanager/LockFactory.java   |   6 +-
 .../apache/helix/recipes/rabbitmq/Consumer.java |   6 +-
 .../recipes/rabbitmq/ConsumerStateModel.java    |   5 +-
 .../rabbitmq/ConsumerStateModelFactory.java     |  40 --
 .../ConsumerStateTransitionHandlerFactory.java  |  40 ++
 .../helix/filestore/FileStoreStateModel.java    |   4 +-
 .../filestore/FileStoreStateModelFactory.java   |   6 +-
 .../helix/taskexecution/TaskStateModel.java     |   4 +-
 .../taskexecution/TaskStateModelFactory.java    |   6 +-
 .../helix/userdefinedrebalancer/Lock.java       |   4 +-
 .../userdefinedrebalancer/LockFactory.java      |   6 +-
 98 files changed, 1008 insertions(+), 2035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
index a9ecaa0..6db5107 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
@@ -57,12 +57,9 @@ public class TestResetInstance extends AdminTestBase {
     MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+    errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
     MockParticipant[] participants = new MockParticipant[n];

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
index a54b0a3..d066f18 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
@@ -57,12 +57,9 @@ public class TestResetResource extends AdminTestBase {
     MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+    errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
     MockParticipant[] participants = new MockParticipant[n];

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
index d227ac3..509c0b7 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
@@ -30,17 +30,17 @@ import org.apache.helix.ExternalCommand;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.api.State;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.log4j.Logger;
 
 @StateModelInfo(initialState = "OFFLINE", states = {})
-public class AgentStateModel extends StateModel {
+public class AgentStateModel extends TransitionHandler {
   private static final Logger _logger = Logger.getLogger(AgentStateModel.class);
   private static Pattern pattern = Pattern.compile("(\\{.+?\\})");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index 69d45ae..4e74405 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@ -19,13 +19,13 @@ package org.apache.helix.agent;
  * under the License.
  */
 
+import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
 
-public class AgentStateModelFactory extends HelixStateModelFactory<AgentStateModel> {
+public class AgentStateModelFactory extends StateTransitionHandlerFactory<AgentStateModel> {
 
   @Override
-  public AgentStateModel createNewStateModel(PartitionId partitionKey) {
+  public AgentStateModel createStateTransitionHandler(PartitionId partitionKey) {
     AgentStateModel model = new AgentStateModel();
     return model;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
new file mode 100644
index 0000000..1e2326d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
@@ -0,0 +1,114 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.messaging.handling.BatchMessageWrapper;
+
+public abstract class StateTransitionHandlerFactory<T extends TransitionHandler> {
+  /**
+   * map from partitionId to transition-handler
+   */
+  private final ConcurrentMap<PartitionId, T> _transitionHandlerMap =
+      new ConcurrentHashMap<PartitionId, T>();
+
+  /**
+   * map from resourceName to BatchMessageWrapper
+   */
+  private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
+      new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
+
+  /**
+   * This method will be invoked only once per partition per session
+   * @param partitionId
+   * @return
+   */
+  public abstract T createStateTransitionHandler(PartitionId partitionId);
+
+  /**
+   * Create a state model for a partition
+   * @param partitionId
+   */
+  public T createAndAddSTransitionHandler(PartitionId partitionId) {
+    T stateModel = createStateTransitionHandler(partitionId);
+    _transitionHandlerMap.put(partitionId, stateModel);
+    return stateModel;
+  }
+
+  /**
+   * Get the state model for a partition
+   * @param partitionId
+   * @return state model if exists, null otherwise
+   */
+  public T getTransitionHandler(PartitionId partitionId) {
+    return _transitionHandlerMap.get(partitionId);
+  }
+
+  /**
+   * remove state model for a partition
+   * @param partitionId
+   * @return state model removed or null if not exist
+   */
+  public T removeTransitionHandler(PartitionId partitionId) {
+    return _transitionHandlerMap.remove(partitionId);
+  }
+
+  /**
+   * get partition set
+   * @return partitionId set
+   */
+  public Set<PartitionId> getPartitionSet() {
+    return _transitionHandlerMap.keySet();
+  }
+
+  /**
+   * create a default batch-message-wrapper for a resource
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
+    return new BatchMessageWrapper();
+  }
+
+  /**
+   * create a batch-message-wrapper for a resource and put it into map
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
+    BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
+    _batchMsgWrapperMap.put(resourceId, wrapper);
+    return wrapper;
+  }
+
+  /**
+   * get batch-message-wrapper for a resource
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
+    return _batchMsgWrapperMap.get(resourceId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
new file mode 100644
index 0000000..ff82a2d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
@@ -0,0 +1,106 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public abstract class TransitionHandler {
+  public static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+  Logger logger = Logger.getLogger(TransitionHandler.class);
+
+  // TODO Get default state from implementation or from state model annotation
+  // StateModel with initial state other than OFFLINE should override this field
+  protected String _currentState = DEFAULT_INITIAL_STATE;
+
+  /**
+   * requested-state is used (e.g. by task-framework) to request next state
+   */
+  protected String _requestedState = null;
+
+  public String getCurrentState() {
+    return _currentState;
+  }
+
+  // @transition(from='from', to='to')
+  public void defaultTransitionHandler() {
+    logger
+        .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
+  }
+
+  public boolean updateState(String newState) {
+    _currentState = newState;
+    return true;
+  }
+
+  /**
+   * Get requested-state
+   * @return requested-state
+   */
+  public String getRequestedState() {
+    return _requestedState;
+  }
+
+  /**
+   * Set requested-state
+   * @param requestedState
+   */
+  public void setRequestedState(String requestedState) {
+    _requestedState = requestedState;
+  }
+
+  /**
+   * Called when error occurs in state transition
+   * TODO:enforce subclass to write this
+   * @param message
+   * @param context
+   * @param error
+   */
+  public void rollbackOnError(Message message, NotificationContext context,
+      StateTransitionError error) {
+
+    logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
+
+  }
+
+  /**
+   * Called when the state model is reset
+   */
+  public void reset() {
+    logger
+        .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
+  }
+
+  /**
+   * default transition for drop partition in error state
+   * @param message
+   * @param context
+   * @throws InterruptedException
+   */
+  @Transition(to = "DROPPED", from = "ERROR")
+  public void onBecomeDroppedFromError(Message message, NotificationContext context)
+      throws Exception {
+    logger.info("Default ERROR->DROPPED transition invoked.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
index 7c84f0f..b98cbcd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
@@ -26,6 +26,10 @@ import org.codehaus.jackson.annotate.JsonProperty;
 public class StateModelDefId extends Id {
   public static final StateModelDefId SchedulerTaskQueue = StateModelDefId
       .from(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
+  public static final StateModelDefId MasterSlave = StateModelDefId.from("MasterSlave");
+  public static final StateModelDefId LeaderStandby = StateModelDefId.from("LeaderStandby");
+  public static final StateModelDefId OnlineOffline = StateModelDefId.from("OnlineOffline");
+
   @JsonProperty("id")
   private final String _id;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 6aa3ab9..ca540c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,6 +47,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.HelixManagerShutdownHook;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
@@ -164,18 +165,11 @@ public class HelixControllerMain {
         DistClusterControllerStateModelFactory stateModelFactory =
             new DistClusterControllerStateModelFactory(zkConnectString);
 
-        // StateMachineEngine genericStateMachineHandler = new
-        // StateMachineEngine();
         StateMachineEngine stateMach = manager.getStateMachineEngine();
-        stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
-        // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-        // genericStateMachineHandler);
+        stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory);
         manager.connect();
       } else {
         logger.error("cluster controller mode:" + controllerMode + " NOT supported");
-        // throw new
-        // IllegalArgumentException("Unsupported cluster controller mode:" +
-        // controllerMode);
       }
     } catch (Exception e) {
       logger.error("Exception while starting controller", e);

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
deleted file mode 100644
index cdb2845..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ /dev/null
@@ -1,420 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
-import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.MessageListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class CallbackHandler implements IZkChildListener, IZkDataListener
-
-{
-  private static Logger logger = Logger.getLogger(CallbackHandler.class);
-
-  /**
-   * define the next possible notification types
-   */
-  private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
-  static {
-    nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
-    nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
-    nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
-  }
-
-  private final String _path;
-  private final Object _listener;
-  private final EventType[] _eventTypes;
-  private final HelixDataAccessor _accessor;
-  private final ChangeType _changeType;
-  private final ZkClient _zkClient;
-  private final AtomicLong _lastNotificationTimeStamp;
-  private final HelixManager _manager;
-  private final PropertyKey _propertyKey;
-
-  /**
-   * maintain the expected notification types
-   * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
-   */
-  private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
-
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
-      Object listener, EventType[] eventTypes, ChangeType changeType) {
-    if (listener == null) {
-      throw new HelixException("listener could not be null");
-    }
-
-    this._manager = manager;
-    this._accessor = manager.getHelixDataAccessor();
-    this._zkClient = client;
-    this._propertyKey = propertyKey;
-    this._path = propertyKey.getPath();
-    this._listener = listener;
-    this._eventTypes = eventTypes;
-    this._changeType = changeType;
-    this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
-    init();
-  }
-
-  public Object getListener() {
-    return _listener;
-  }
-
-  public String getPath() {
-    return _path;
-  }
-
-  public void invoke(NotificationContext changeContext) throws Exception {
-    // This allows the listener to work with one change at a time
-    synchronized (_manager) {
-      Type type = changeContext.getType();
-      if (!_expectTypes.contains(type)) {
-        logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
-            + ", expected types: " + _expectTypes + " but was " + type);
-        return;
-      }
-      _expectTypes = nextNotificationType.get(type);
-
-      // Builder keyBuilder = _accessor.keyBuilder();
-      long start = System.currentTimeMillis();
-      if (logger.isInfoEnabled()) {
-        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
-            + _listener.getClass().getCanonicalName());
-      }
-
-      if (_changeType == IDEAL_STATE) {
-
-        IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
-
-        idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
-      } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
-        subscribeForChanges(changeContext, _path, true, true);
-        InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
-        List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
-        listener.onInstanceConfigChange(configs, changeContext);
-      } else if (_changeType == CONFIG) {
-        subscribeForChanges(changeContext, _path, true, true);
-        ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
-        List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
-        listener.onConfigChange(configs, changeContext);
-      } else if (_changeType == LIVE_INSTANCE) {
-        LiveInstanceChangeListener liveInstanceChangeListener =
-            (LiveInstanceChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
-
-        liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
-      } else if (_changeType == CURRENT_STATE) {
-        CurrentStateChangeListener currentStateChangeListener =
-            (CurrentStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
-        List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
-
-        currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
-
-      } else if (_changeType == MESSAGE) {
-        MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-        List<Message> messages = _accessor.getChildValues(_propertyKey);
-
-        messageListener.onMessage(instanceName, messages, changeContext);
-
-      } else if (_changeType == MESSAGES_CONTROLLER) {
-        MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        List<Message> messages = _accessor.getChildValues(_propertyKey);
-
-        messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
-
-      } else if (_changeType == EXTERNAL_VIEW) {
-        ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
-
-        externalViewListener.onExternalViewChange(externalViewList, changeContext);
-      } else if (_changeType == ChangeType.CONTROLLER) {
-        ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        controllerChangelistener.onControllerChange(changeContext);
-      }
-
-      long end = System.currentTimeMillis();
-      if (logger.isInfoEnabled()) {
-        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
-            + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
-      }
-    }
-  }
-
-  private void subscribeChildChange(String path, NotificationContext context) {
-    NotificationContext.Type type = context.getType();
-    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
-      logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
-          + ", listener: " + _listener);
-      _zkClient.subscribeChildChanges(path, this);
-    } else if (type == NotificationContext.Type.FINALIZE) {
-      logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
-          + ", listener: " + _listener);
-
-      _zkClient.unsubscribeChildChanges(path, this);
-    }
-  }
-
-  private void subscribeDataChange(String path, NotificationContext context) {
-    NotificationContext.Type type = context.getType();
-    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
-            + ", listener: " + _listener);
-      }
-      _zkClient.subscribeDataChanges(path, this);
-
-    } else if (type == NotificationContext.Type.FINALIZE) {
-      logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
-          + ", listener: " + _listener);
-
-      _zkClient.unsubscribeDataChanges(path, this);
-    }
-  }
-
-  // TODO watchParent is always true. consider remove it
-  private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
-      boolean watchChild) {
-    if (watchParent) {
-      subscribeChildChange(path, context);
-    }
-
-    if (watchChild) {
-      try {
-        switch (_changeType) {
-        case CURRENT_STATE:
-        case IDEAL_STATE:
-        case EXTERNAL_VIEW: {
-          // check if bucketized
-          BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-          List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
-          for (ZNRecord record : records) {
-            HelixProperty property = new HelixProperty(record);
-            String childPath = path + "/" + record.getId();
-
-            int bucketSize = property.getBucketSize();
-            if (bucketSize > 0) {
-              // subscribe both data-change and child-change on bucketized parent node
-              // data-change gives a delete-callback which is used to remove watch
-              subscribeChildChange(childPath, context);
-              subscribeDataChange(childPath, context);
-
-              // subscribe data-change on bucketized child
-              List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
-              if (bucketizedChildNames != null) {
-                for (String bucketizedChildName : bucketizedChildNames) {
-                  String bucketizedChildPath = childPath + "/" + bucketizedChildName;
-                  subscribeDataChange(bucketizedChildPath, context);
-                }
-              }
-            } else {
-              subscribeDataChange(childPath, context);
-            }
-          }
-          break;
-        }
-        default: {
-          List<String> childNames = _zkClient.getChildren(path);
-          if (childNames != null) {
-            for (String childName : childNames) {
-              String childPath = path + "/" + childName;
-              subscribeDataChange(childPath, context);
-            }
-          }
-          break;
-        }
-        }
-      } catch (ZkNoNodeException e) {
-        logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
-            + _listener, e);
-      }
-    }
-
-  }
-
-  public EventType[] getEventTypes() {
-    return _eventTypes;
-  }
-
-  /**
-   * Invoke the listener so that it sets up the initial values from the zookeeper if any
-   * exists
-   */
-  public void init() {
-    updateNotificationTime(System.nanoTime());
-    try {
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.INIT);
-      invoke(changeContext);
-    } catch (Exception e) {
-      String msg = "Exception while invoking init callback for listener:" + _listener;
-      ZKExceptionHandler.getInstance().handle(msg, e);
-    }
-  }
-
-  @Override
-  public void handleDataChange(String dataPath, Object data) {
-    try {
-      updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path)) {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        invoke(changeContext);
-      }
-    } catch (Exception e) {
-      String msg =
-          "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
-      ZKExceptionHandler.getInstance().handle(msg, e);
-    }
-  }
-
-  @Override
-  public void handleDataDeleted(String dataPath) {
-    try {
-      updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path)) {
-        logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
-            + ", listener: " + _listener);
-        _zkClient.unsubscribeDataChanges(dataPath, this);
-
-        // only needed for bucketized parent, but OK if we don't have child-change
-        // watch on the bucketized parent path
-        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
-            + ", listener: " + _listener);
-        _zkClient.unsubscribeChildChanges(dataPath, this);
-        // No need to invoke() since this event will handled by child-change on parent-node
-        // NotificationContext changeContext = new NotificationContext(_manager);
-        // changeContext.setType(NotificationContext.Type.CALLBACK);
-        // invoke(changeContext);
-      }
-    } catch (Exception e) {
-      String msg =
-          "exception in handling data-delete-change. path: " + dataPath + ", listener: "
-              + _listener;
-      ZKExceptionHandler.getInstance().handle(msg, e);
-    }
-  }
-
-  @Override
-  public void handleChildChange(String parentPath, List<String> currentChilds) {
-    try {
-      updateNotificationTime(System.nanoTime());
-      if (parentPath != null && parentPath.startsWith(_path)) {
-        NotificationContext changeContext = new NotificationContext(_manager);
-
-        if (currentChilds == null) {
-          // parentPath has been removed
-          if (parentPath.equals(_path)) {
-            // _path has been removed, remove this listener
-            _manager.removeListener(_propertyKey, _listener);
-          }
-          changeContext.setType(NotificationContext.Type.FINALIZE);
-        } else {
-          changeContext.setType(NotificationContext.Type.CALLBACK);
-        }
-        invoke(changeContext);
-      }
-    } catch (Exception e) {
-      String msg =
-          "exception in handling child-change. instance: " + _manager.getInstanceName()
-              + ", parentPath: " + parentPath + ", listener: " + _listener;
-      ZKExceptionHandler.getInstance().handle(msg, e);
-    }
-  }
-
-  /**
-   * Invoke the listener for the last time so that the listener could clean up resources
-   */
-  public void reset() {
-    try {
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.FINALIZE);
-      invoke(changeContext);
-    } catch (Exception e) {
-      String msg = "Exception while resetting the listener:" + _listener;
-      ZKExceptionHandler.getInstance().handle(msg, e);
-    }
-  }
-
-  private void updateNotificationTime(long nanoTime) {
-    long l = _lastNotificationTimeStamp.get();
-    while (nanoTime > l) {
-      boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
-      if (b) {
-        break;
-      } else {
-        l = _lastNotificationTimeStamp.get();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
deleted file mode 100644
index c1d856d..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * helper class for participant-manager
- */
-public class ParticipantManagerHelper {
-  private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
-
-  final ZkClient _zkclient;
-  final HelixManager _manager;
-  final PropertyKey.Builder _keyBuilder;
-  final String _clusterName;
-  final String _instanceName;
-  final String _sessionId;
-  final int _sessionTimeout;
-  final ConfigAccessor _configAccessor;
-  final InstanceType _instanceType;
-  final HelixAdmin _helixAdmin;
-  final ZKHelixDataAccessor _dataAccessor;
-  final DefaultMessagingService _messagingService;
-  final StateMachineEngine _stateMachineEngine;
-  final LiveInstanceInfoProvider _liveInstanceInfoProvider;
-
-  public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
-      LiveInstanceInfoProvider liveInstanceInfoProvider) {
-    _zkclient = zkclient;
-    _manager = manager;
-    _clusterName = manager.getClusterName();
-    _instanceName = manager.getInstanceName();
-    _keyBuilder = new PropertyKey.Builder(_clusterName);
-    _sessionId = manager.getSessionId();
-    _sessionTimeout = sessionTimeout;
-    _configAccessor = manager.getConfigAccessor();
-    _instanceType = manager.getInstanceType();
-    _helixAdmin = manager.getClusterManagmentTool();
-    _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
-    _messagingService = (DefaultMessagingService) manager.getMessagingService();
-    _stateMachineEngine = manager.getStateMachineEngine();
-    _liveInstanceInfoProvider = liveInstanceInfoProvider;
-  }
-
-  public void joinCluster() {
-    // Read cluster config and see if instance can auto join the cluster
-    boolean autoJoin = false;
-    try {
-      HelixConfigScope scope =
-          new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
-              _manager.getClusterName()).build();
-      autoJoin =
-          Boolean.parseBoolean(_configAccessor.get(scope,
-              ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
-      LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
-    } catch (Exception e) {
-      // autoJoin is false
-    }
-
-    if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
-      if (!autoJoin) {
-        throw new HelixException("Initial cluster structure is not set up for instance: "
-            + _instanceName + ", instanceType: " + _instanceType);
-      } else {
-        LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
-        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
-        String hostName = _instanceName;
-        String port = "";
-        int lastPos = _instanceName.lastIndexOf("_");
-        if (lastPos > 0) {
-          hostName = _instanceName.substring(0, lastPos);
-          port = _instanceName.substring(lastPos + 1);
-        }
-        instanceConfig.setHostName(hostName);
-        instanceConfig.setPort(port);
-        instanceConfig.setInstanceEnabled(true);
-        _helixAdmin.addInstance(_clusterName, instanceConfig);
-      }
-    }
-  }
-
-  public void createLiveInstance() {
-    String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
-    LiveInstance liveInstance = new LiveInstance(_instanceName);
-    liveInstance.setSessionId(_sessionId);
-    liveInstance.setHelixVersion(_manager.getVersion());
-    liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
-    // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
-    if (_liveInstanceInfoProvider != null) {
-      LOG.info("invoke liveInstanceInfoProvider");
-      ZNRecord additionalLiveInstanceInfo =
-          _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
-      if (additionalLiveInstanceInfo != null) {
-        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
-        ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
-        liveInstance = new LiveInstance(mergedLiveInstance);
-        LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
-      }
-    }
-
-    boolean retry;
-    do {
-      retry = false;
-      try {
-        _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-      } catch (ZkNodeExistsException e) {
-        LOG.warn("found another instance with same instanceName: " + _instanceName + " in cluster "
-            + _clusterName);
-
-        Stat stat = new Stat();
-        ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
-        if (record == null) {
-          /**
-           * live-instance is gone as we check it, retry create live-instance
-           */
-          retry = true;
-        } else {
-          String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
-          if (ephemeralOwner.equals(_sessionId)) {
-            /**
-             * update sessionId field in live-instance if necessary
-             */
-            LiveInstance curLiveInstance = new LiveInstance(record);
-            if (!curLiveInstance.getTypedSessionId().stringify().equals(_sessionId)) {
-              /**
-               * in last handle-new-session,
-               * live-instance is created by new zkconnection with stale session-id inside
-               * just update session-id field
-               */
-              LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
-                  + ", old-sessionId: " + curLiveInstance.getTypedSessionId() + ", new-sessionId: "
-                  + _sessionId);
-
-              curLiveInstance.setSessionId(_sessionId);
-              _zkclient.writeData(liveInstancePath, curLiveInstance.getRecord());
-            }
-          } else {
-            /**
-             * wait for a while, in case previous helix-participant exits unexpectedly
-             * and its live-instance still hangs around until session timeout
-             */
-            try {
-              TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
-            } catch (InterruptedException ex) {
-              LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
-            }
-            /**
-             * give a last try after exit while loop
-             */
-            retry = true;
-            break;
-          }
-        }
-      }
-    } while (retry);
-
-    /**
-     * give a last shot
-     */
-    if (retry) {
-      try {
-        _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-      } catch (Exception e) {
-        String errorMessage =
-            "instance: " + _instanceName + " already has a live-instance in cluster "
-                + _clusterName;
-        LOG.error(errorMessage);
-        throw new HelixException(errorMessage);
-      }
-    }
-  }
-
-  /**
-   * carry over current-states from last sessions
-   * set to initial state for current session only when state doesn't exist in current session
-   */
-  public void carryOverPreviousCurrentState() {
-    List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName));
-
-    for (String session : sessions) {
-      if (session.equals(_sessionId)) {
-        continue;
-      }
-
-      List<CurrentState> lastCurStates =
-          _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session));
-
-      for (CurrentState lastCurState : lastCurStates) {
-        LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
-            + " to current session: " + _sessionId);
-        String stateModelDefRef = lastCurState.getStateModelDefRef();
-        if (stateModelDefRef == null) {
-          LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
-              + lastCurState);
-          continue;
-        }
-        StateModelDefinition stateModel =
-            _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
-
-        String curStatePath =
-            _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
-                .getPath();
-        _dataAccessor.getBaseDataAccessor().update(curStatePath,
-            new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
-            AccessOption.PERSISTENT);
-      }
-    }
-
-    /**
-     * remove previous current state parent nodes
-     */
-    for (String session : sessions) {
-      if (session.equals(_sessionId)) {
-        continue;
-      }
-
-      String path = _keyBuilder.currentStates(_instanceName, session).getPath();
-      LOG.info("Removing current states from previous sessions. path: " + path);
-      _zkclient.deleteRecursive(path);
-    }
-  }
-
-  public void setupMsgHandler() throws Exception {
-    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-        _stateMachineEngine);
-    _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
-
-    ScheduledTaskStateModelFactory stStateModelFactory =
-        new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
-    _stateMachineEngine.registerStateModelFactory(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
-    _messagingService.onConnected();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index af50eb7..81810de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -44,6 +44,7 @@ import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.Id;
 import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
@@ -348,8 +349,8 @@ public class ZkHelixParticipant implements HelixParticipant {
 
     ScheduledTaskStateModelFactory stStateModelFactory =
         new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
-    _stateMachineEngine.registerStateModelFactory(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+    _stateMachineEngine.registerStateModelFactory(StateModelDefId.SchedulerTaskQueue,
+        stStateModelFactory);
     _messagingService.onConnected();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 1bb6506..4ede2a4 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -43,14 +43,14 @@ import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
 import org.apache.helix.util.StatusUpdateUtil;
@@ -66,16 +66,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
   }
 
   private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
-  private final StateModel _stateModel;
+  private final TransitionHandler _stateModel;
   StatusUpdateUtil _statusUpdateUtil;
   private final StateModelParser _transitionMethodFinder;
   private final CurrentState _currentStateDelta;
   private final HelixManager _manager;
-  private final StateModelFactory<? extends StateModel> _stateModelFactory;
+  private final StateTransitionHandlerFactory<? extends TransitionHandler> _stateModelFactory;
   volatile boolean _isTimeout = false;
 
-  public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
-      StateModel stateModel, Message message, NotificationContext context,
+  public HelixStateTransitionHandler(StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory,
+      TransitionHandler stateModel, Message message, NotificationContext context,
       CurrentState currentStateDelta) {
     super(message, context);
     _stateModel = stateModel;
@@ -206,7 +206,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
         List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
-        _stateModelFactory.removeStateModel(partitionId.stringify());
+        _stateModelFactory.removeTransitionHandler(partitionId);
       } else {
         // if the partition is not to be dropped, update _stateModel to the TO_STATE
         _stateModel.updateState(toState.toString());

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
index 6c96629..17adf7f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -29,6 +29,7 @@ import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
@@ -39,9 +40,9 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
     ExternalViewChangeListener {
   private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
   private final CustomCodeCallbackHandler _callback;
-  private final String _partitionKey;
+  private final PartitionId _partitionKey;
 
-  public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey) {
+  public CustomCodeInvoker(CustomCodeCallbackHandler callback, PartitionId partitionKey) {
     _callback = callback;
     _partitionKey = partitionKey;
   }
@@ -60,7 +61,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
       String sessionId = manager.getSessionId();
 
       // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
-      String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+      String resourceName = _partitionKey.stringify().substring(0, _partitionKey.stringify().lastIndexOf('_'));
 
       CurrentState curState =
           accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
@@ -68,7 +69,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
         return;
       }
 
-      String state = curState.getState(_partitionKey);
+      String state = curState.getState(_partitionKey.stringify());
       if (state == null || !state.equalsIgnoreCase("LEADER")) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 0c2eb7c..4579810 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -23,8 +23,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
 @StateModelInfo(initialState = "OFFLINE", states = {
     "LEADER", "STANDBY"
 })
-public class DistClusterControllerStateModel extends StateModel {
+public class DistClusterControllerStateModel extends TransitionHandler {
   private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
   private HelixManager _controller = null;
   private final String _zkAddr;

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
index a367c81..5f21e90 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -1,5 +1,8 @@
 package org.apache.helix.participant;
 
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,10 +22,9 @@ package org.apache.helix.participant;
  * under the License.
  */
 
-import org.apache.helix.participant.statemachine.StateModelFactory;
 
 public class DistClusterControllerStateModelFactory extends
-    StateModelFactory<DistClusterControllerStateModel> {
+    StateTransitionHandlerFactory<DistClusterControllerStateModel> {
   private final String _zkAddr;
 
   public DistClusterControllerStateModelFactory(String zkAddr) {
@@ -30,7 +32,7 @@ public class DistClusterControllerStateModelFactory extends
   }
 
   @Override
-  public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
+  public DistClusterControllerStateModel createStateTransitionHandler(PartitionId partition) {
     return new DistClusterControllerStateModel(_zkAddr);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
index 3866cf5..ba23a43 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -25,8 +25,9 @@ import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.log4j.Logger;
@@ -34,14 +35,14 @@ import org.apache.log4j.Logger;
 @StateModelInfo(initialState = "OFFLINE", states = {
     "LEADER", "STANDBY"
 })
-public class GenericLeaderStandbyModel extends StateModel {
+public class GenericLeaderStandbyModel extends TransitionHandler {
   private static Logger LOG = Logger.getLogger(GenericLeaderStandbyModel.class);
 
   private final CustomCodeInvoker _particHolder;
   private final List<ChangeType> _notificationTypes;
 
   public GenericLeaderStandbyModel(CustomCodeCallbackHandler callback,
-      List<ChangeType> notificationTypes, String partitionKey) {
+      List<ChangeType> notificationTypes, PartitionId partitionKey) {
     _particHolder = new CustomCodeInvoker(callback, partitionKey);
     _notificationTypes = notificationTypes;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
index 51c91cc..c5d7390 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -22,10 +22,11 @@ package org.apache.helix.participant;
 import java.util.List;
 
 import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
 
 public class GenericLeaderStandbyStateModelFactory extends
-    StateModelFactory<GenericLeaderStandbyModel> {
+    StateTransitionHandlerFactory<GenericLeaderStandbyModel> {
   private final CustomCodeCallbackHandler _callback;
   private final List<ChangeType> _notificationTypes;
 
@@ -39,7 +40,7 @@ public class GenericLeaderStandbyStateModelFactory extends
   }
 
   @Override
-  public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
-    return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
+  public GenericLeaderStandbyModel createStateTransitionHandler(PartitionId partition) {
+    return new GenericLeaderStandbyModel(_callback, _notificationTypes, partition);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index 2f169cc..97b266e 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -55,7 +55,6 @@ import org.apache.log4j.Logger;
  * </code>
  */
 public class HelixCustomCodeRunner {
-  private static final String LEADER_STANDBY = "LeaderStandby";
   private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
   private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
 
@@ -130,7 +129,8 @@ public class HelixCustomCodeRunner {
     _stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
 
     StateMachineEngine stateMach = _manager.getStateMachineEngine();
-    stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
+    stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, _resourceName,
+        _stateModelFty);
     ZkClient zkClient = null;
     try {
       // manually add ideal state for participant leader using LeaderStandby
@@ -148,7 +148,7 @@ public class HelixCustomCodeRunner {
       idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
       idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
       idealState.setNumPartitions(1);
-      idealState.setStateModelDefId(StateModelDefId.from(LEADER_STANDBY));
+      idealState.setStateModelDefId(StateModelDefId.LeaderStandby);
       idealState.setStateModelFactoryId(StateModelFactoryId.from(_resourceName));
       List<String> prefList =
           new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE.toString()));
@@ -177,7 +177,7 @@ public class HelixCustomCodeRunner {
    */
   public void stop() {
     LOG.info("Removing stateModelFactory for " + _resourceName);
-    _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
+    _manager.getStateMachineEngine().removeStateModelFactory(StateModelDefId.LeaderStandby,
         _resourceName);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 95afb70..56769b5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -32,6 +32,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
@@ -46,10 +48,6 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactoryAdaptor;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.log4j.Logger;
 
@@ -58,27 +56,26 @@ public class HelixStateMachineEngine implements StateMachineEngine {
 
   /**
    * Map of StateModelDefId to map of FactoryName to StateModelFactory
-   * TODO change to use StateModelDefId and HelixStateModelFactory
    */
-  private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
+  private final Map<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>> _stateModelFactoryMap;
   private final StateModelParser _stateModelParser;
   private final HelixManager _manager;
-  private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
+  private final ConcurrentHashMap<StateModelDefId, StateModelDefinition> _stateModelDefs;
 
   public HelixStateMachineEngine(HelixManager manager) {
     _stateModelParser = new StateModelParser();
     _manager = manager;
 
     _stateModelFactoryMap =
-        new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
-    _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+        new ConcurrentHashMap<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>>();
+    _stateModelDefs = new ConcurrentHashMap<StateModelDefId, StateModelDefinition>();
   }
 
-  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) {
+  public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName) {
     return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
   }
 
-  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+  public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName,
       String factoryName) {
     if (!_stateModelFactoryMap.containsKey(stateModelName)) {
       return null;
@@ -86,39 +83,6 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     return _stateModelFactoryMap.get(stateModelName).get(factoryName);
   }
 
-  @Override
-  public boolean registerStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory) {
-    return registerStateModelFactory(stateModelDef, factory,
-        HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-  }
-
-  @Override
-  public boolean registerStateModelFactory(String stateModelName,
-      StateModelFactory<? extends StateModel> factory, String factoryName) {
-    if (stateModelName == null || factory == null || factoryName == null) {
-      throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
-    }
-
-    LOG.info("Register state model factory for state model " + stateModelName
-        + " using factory name " + factoryName + " with " + factory);
-
-    if (!_stateModelFactoryMap.containsKey(stateModelName)) {
-      _stateModelFactoryMap.put(stateModelName,
-          new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
-    }
-
-    if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) {
-      LOG.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
-          + " has already been registered.");
-      return false;
-    }
-
-    _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
-    sendNopMessage();
-    return true;
-  }
-
   // TODO: duplicated code in DefaultMessagingService
   private void sendNopMessage() {
     if (_manager.isConnected()) {
@@ -150,11 +114,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
 
   @Override
   public void reset() {
-    for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
+    for (Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap : _stateModelFactoryMap
         .values()) {
-      for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
-        for (String resourceKey : stateModelFactory.getPartitionSet()) {
-          StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
+      for (StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory : ftyMap.values()) {
+        for (PartitionId partition : stateModelFactory.getPartitionSet()) {
+          TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partition);
           stateModel.reset();
           String initialState = _stateModelParser.getInitialState(stateModel.getClass());
           stateModel.updateState(initialState);
@@ -191,8 +155,8 @@ public class HelixStateMachineEngine implements StateMachineEngine {
       factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
     }
 
-    StateModelFactory<? extends StateModel> stateModelFactory =
-        getStateModelFactory(stateModelId.stringify(), factoryName);
+    StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory =
+        getStateModelFactory(stateModelId, factoryName);
     if (stateModelFactory == null) {
       LOG.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
           + stateModelId + " using factoryName: " + factoryName + " for resource: " + resourceId);
@@ -200,7 +164,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     }
 
     // check if the state model definition exists and cache it
-    if (!_stateModelDefs.containsKey(stateModelId.stringify())) {
+    if (!_stateModelDefs.containsKey(stateModelId)) {
       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
       StateModelDefinition stateModelDef =
@@ -209,15 +173,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
         throw new HelixException("fail to create msg-handler because stateModelDef for "
             + stateModelId + " does NOT exist");
       }
-      _stateModelDefs.put(stateModelId.stringify(), stateModelDef);
+      _stateModelDefs.put(stateModelId, stateModelDef);
     }
 
     if (message.getBatchMessageMode() == false) {
       // create currentStateDelta for this partition
-      String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
-      StateModel stateModel = stateModelFactory.getStateModel(partitionKey.stringify());
+      String initState = _stateModelDefs.get(message.getStateModelDefId()).getInitialState();
+      TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partitionKey);
       if (stateModel == null) {
-        stateModel = stateModelFactory.createAndAddStateModel(partitionKey.stringify());
+        stateModel = stateModelFactory.createAndAddSTransitionHandler(partitionKey);
         stateModel.updateState(initState);
       }
 
@@ -237,9 +201,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
           currentStateDelta);
     } else {
       BatchMessageWrapper wrapper =
-          stateModelFactory.getBatchMessageWrapper(resourceId.stringify());
+          stateModelFactory.getBatchMessageWrapper(resourceId);
       if (wrapper == null) {
-        wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId.stringify());
+        wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId);
       }
 
       // get executor-service for the message
@@ -259,27 +223,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
   }
 
   @Override
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory) {
-    throw new UnsupportedOperationException("Remove not yet supported");
-  }
-
-  @Override
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory, String factoryName) {
-    throw new UnsupportedOperationException("Remove not yet supported");
-  }
-
-  @Override
   public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
-      HelixStateModelFactory<? extends StateModel> factory) {
+      StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
     return registerStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY,
         factory);
   }
 
   @Override
   public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
-      HelixStateModelFactory<? extends StateModel> factory) {
+      StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
     if (stateModelDefId == null || factoryName == null || factory == null) {
       LOG.info("stateModelDefId|factoryName|stateModelFactory is null");
       return false;
@@ -288,22 +240,18 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     LOG.info("Registering state model factory for state-model-definition: " + stateModelDefId
         + " using factory-name: " + factoryName + " with: " + factory);
 
-    StateModelFactory<? extends StateModel> factoryAdaptor =
-        new HelixStateModelFactoryAdaptor(factory);
-
-    String stateModelDefName = stateModelDefId.stringify();
-    if (!_stateModelFactoryMap.containsKey(stateModelDefName)) {
-      _stateModelFactoryMap.put(stateModelDefName,
-          new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+    if (!_stateModelFactoryMap.containsKey(stateModelDefId)) {
+      _stateModelFactoryMap.put(stateModelDefId,
+          new ConcurrentHashMap<String, StateTransitionHandlerFactory<? extends TransitionHandler>>());
     }
 
-    if (_stateModelFactoryMap.get(stateModelDefName).containsKey(factoryName)) {
+    if (_stateModelFactoryMap.get(stateModelDefId).containsKey(factoryName)) {
       LOG.info("Skip register state model factory for " + stateModelDefId + " using factory-name "
           + factoryName + ", since it has already been registered.");
       return false;
     }
 
-    _stateModelFactoryMap.get(stateModelDefName).put(factoryName, factoryAdaptor);
+    _stateModelFactoryMap.get(stateModelDefId).put(factoryName, factory);
 
     sendNopMessage();
     return true;
@@ -324,15 +272,14 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     LOG.info("Removing state model factory for state-model-definition: " + stateModelDefId
         + " using factory-name: " + factoryName);
 
-    String stateModelDefName = stateModelDefId.stringify();
-    Map<String, StateModelFactory<? extends StateModel>> ftyMap =
-        _stateModelFactoryMap.get(stateModelDefName);
+    Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap =
+        _stateModelFactoryMap.get(stateModelDefId);
     if (ftyMap == null) {
       LOG.info("Skip remove state model factory " + stateModelDefId + ", since it does NOT exist");
       return false;
     }
 
-    StateModelFactory<? extends StateModel> fty = ftyMap.remove(factoryName);
+    StateTransitionHandlerFactory<? extends TransitionHandler> fty = ftyMap.remove(factoryName);
     if (fty == null) {
       LOG.info("Skip remove state model factory " + stateModelDefId + " using factory-name "
           + factoryName + ", since it does NOT exist");
@@ -340,11 +287,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     }
 
     if (ftyMap.isEmpty()) {
-      _stateModelFactoryMap.remove(stateModelDefName);
+      _stateModelFactoryMap.remove(stateModelDefId);
     }
 
-    for (String partition : fty.getPartitionSet()) {
-      StateModel stateModel = fty.getStateModel(partition);
+    for (PartitionId partition : fty.getPartitionSet()) {
+      TransitionHandler stateModel = fty.getTransitionHandler(partition);
       stateModel.reset();
       // TODO probably should remove the state from zookeeper
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index abb7d81..48f06d3 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -19,11 +19,10 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 
 /**
  * Helix participant uses this class to register/remove state model factory
@@ -32,35 +31,6 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 public interface StateMachineEngine extends MessageHandlerFactory {
 
   /**
-   * Replaced by {@link #registerStateModelFactory(StateModelDefId, HelixStateModelFactory)
-
-   */
-  @Deprecated
-  public boolean registerStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory);
-
-  /**
-   * Replaced by {@link #registerStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
-   */
-  @Deprecated
-  public boolean registerStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory, String factoryName);
-
-  /**
-   * Replaced by {@link #removeStateModelFactory(StateModelDefId, HelixStateModelFactory)}
-   */
-  @Deprecated
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory);
-
-  /**
-   * Replaced by {@link #removeStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
-   */
-  @Deprecated
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory, String factoryName);
-
-  /**
    * Register a default state model factory for a state model definition
    * A state model definition could be, for example:
    * "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
@@ -70,7 +40,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
    * @return
    */
   public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
-      HelixStateModelFactory<? extends StateModel> factory);
+      StateTransitionHandlerFactory<? extends TransitionHandler> factory);
 
   /**
    * Register a state model factory with a factory name for a state model definition
@@ -81,7 +51,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
    * @return
    */
   public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
-      HelixStateModelFactory<? extends StateModel> factory);
+      StateTransitionHandlerFactory<? extends TransitionHandler> factory);
 
 /**
    * Remove the default state model factory for a state model definition