You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[19/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
new file mode 100644
index 0000000..b8136c8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateModelParser;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+
+
+@StateModelInfo(initialState = "OFFLINE", states = { "LEADER", "STANDBY" })
+public class DistClusterControllerStateModel extends StateModel
+{
+  private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
+  private HelixManager _controller = null;
+  private final String _zkAddr;
+
+  public DistClusterControllerStateModel(String zkAddr)
+  {
+    StateModelParser parser = new StateModelParser();
+    _currentState = parser.getInitialState(DistClusterControllerStateModel.class);
+    _zkAddr = zkAddr;
+  }
+
+  @Transition(to="STANDBY",from="OFFLINE")
+  public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
+  {
+    logger.info("Becoming standby from offline");
+  }
+
+  @Transition(to="LEADER",from="STANDBY")
+  public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
+  throws Exception
+  {
+    String clusterName = message.getPartitionName();
+    String controllerName = message.getTgtName();
+
+    logger.info(controllerName + " becomes leader from standby for " + clusterName);
+    // System.out.println(controllerName + " becomes leader from standby for " + clusterName);
+
+    if (_controller == null)
+    {
+      _controller = HelixManagerFactory
+          .getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
+      _controller.connect();
+      _controller.startTimerTasks();
+    }
+    else
+    {
+      logger.error("controller already exists:" + _controller.getInstanceName()
+                   + " for " + clusterName);
+    }
+
+  }
+
+  @Transition(to="STANDBY",from="LEADER")
+  public void onBecomeStandbyFromLeader(Message message, NotificationContext context)
+  {
+    String clusterName = message.getPartitionName();
+    String controllerName = message.getTgtName();
+
+    logger.info(controllerName + " becoming standby from leader for " + clusterName);
+
+    if (_controller != null)
+    {
+      _controller.disconnect();
+      _controller = null;
+    }
+    else
+    {
+      logger.error("No controller exists for " + clusterName);
+    }
+  }
+
+  @Transition(to="OFFLINE",from="STANDBY")
+  public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
+  {
+    String clusterName = message.getPartitionName();
+    String controllerName = message.getTgtName();
+
+    logger.info(controllerName + " becoming offline from standby for cluster:" + clusterName);
+
+  }
+
+  @Transition(to="DROPPED",from="OFFLINE")
+  public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+  {
+    logger.info("Becoming dropped from offline");
+  }
+
+  @Transition(to="OFFLINE",from="DROPPED")
+  public void onBecomeOfflineFromDropped(Message message, NotificationContext context)
+  {
+    logger.info("Becoming offline from dropped");
+  }
+
+
+  @Override
+  public void rollbackOnError(Message message, NotificationContext context,
+                              StateTransitionError error)
+  {
+    String clusterName = message.getPartitionName();
+    String controllerName = message.getTgtName();
+
+    logger.error(controllerName + " rollbacks on error for " + clusterName);
+
+    if (_controller != null)
+    {
+      _controller.disconnect();
+      _controller = null;
+    }
+
+  }
+
+  @Override
+  public void reset()
+  {
+    if (_controller != null)
+    {
+//      System.out.println("disconnect " + _controller.getInstanceName()
+//                         + "(" + _controller.getInstanceType()
+//                         + ") from " + _controller.getClusterName());
+      _controller.disconnect();
+      _controller = null;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
new file mode 100644
index 0000000..b5fa6ee
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class DistClusterControllerStateModelFactory extends
+    StateModelFactory<DistClusterControllerStateModel>
+{
+  private final String _zkAddr;
+
+  public DistClusterControllerStateModelFactory(String zkAddr)
+  {
+    _zkAddr = zkAddr;
+  }
+
+  @Override
+  public DistClusterControllerStateModel createNewStateModel(String stateUnitKey)
+  {
+    return new DistClusterControllerStateModel(_zkAddr);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
new file mode 100644
index 0000000..b85165c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+
+@StateModelInfo(initialState = "OFFLINE", states = { "LEADER", "STANDBY" })
+public class GenericLeaderStandbyModel extends StateModel
+{
+  private static Logger LOG = Logger.getLogger(GenericLeaderStandbyModel.class);
+
+  private final CustomCodeInvoker _particHolder;
+  private final List<ChangeType> _notificationTypes;
+
+  public GenericLeaderStandbyModel(CustomCodeCallbackHandler callback, 
+                                     List<ChangeType> notificationTypes,
+                                     String partitionKey)
+  {
+    _particHolder = new CustomCodeInvoker(callback, partitionKey);
+    _notificationTypes = notificationTypes;
+  }
+
+  @Transition(to="STANDBY",from="OFFLINE")
+  public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
+  {
+    LOG.info("Become STANDBY from OFFLINE");
+  }
+
+  @Transition(to="LEADER",from="STANDBY")
+  public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
+      throws Exception
+  {
+    LOG.info("Become LEADER from STANDBY");
+    HelixManager manager = context.getManager();
+    if (manager == null)
+    {
+      throw new IllegalArgumentException("Require HelixManager in notification conext");
+    }
+    for (ChangeType notificationType : _notificationTypes)
+    {
+      if (notificationType == ChangeType.LIVE_INSTANCE)
+      {
+        manager.addLiveInstanceChangeListener(_particHolder);
+      }
+      else if (notificationType == ChangeType.CONFIG)
+      {
+        manager.addConfigChangeListener(_particHolder);
+      }
+      else if (notificationType == ChangeType.EXTERNAL_VIEW)
+      {
+        manager.addExternalViewChangeListener(_particHolder);
+      }
+      else
+      {
+        LOG.error("Unsupport notificationType:" + notificationType.toString());
+      }
+    }
+  }
+
+  @Transition(to="STANDBY",from="LEADER")
+  public void onBecomeStandbyFromLeader(Message message, NotificationContext context)
+  {
+    LOG.info("Become STANDBY from LEADER");
+    HelixManager manager = context.getManager();
+    manager.removeListener(_particHolder);    
+  }
+
+  @Transition(to="OFFLINE",from="STANDBY")
+  public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
+  {
+    LOG.info("Become OFFLINE from STANDBY");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
new file mode 100644
index 0000000..2763c6c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.List;
+
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+public class GenericLeaderStandbyStateModelFactory 
+  extends StateModelFactory<GenericLeaderStandbyModel>
+{
+  private final CustomCodeCallbackHandler _callback;
+  private final List<ChangeType> _notificationTypes;
+  public GenericLeaderStandbyStateModelFactory(CustomCodeCallbackHandler callback,
+                                            List<ChangeType> notificationTypes)
+  {
+    if (callback == null || notificationTypes == null || notificationTypes.size() == 0)
+    {
+      throw new IllegalArgumentException("Require: callback | notificationTypes");
+    }
+    _callback = callback;
+    _notificationTypes = notificationTypes;
+  }
+
+  @Override
+  public GenericLeaderStandbyModel createNewStateModel(String partitionKey)
+  {
+    return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
new file mode 100644
index 0000000..ef1b981
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -0,0 +1,183 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This provides the ability for users to run a custom code in exactly one
+ * process using a LeaderStandBy state model. <br/>
+ * A typical use case is when one uses CUSTOMIZED ideal state mode where the
+ * assignment of partition to nodes needs to change dynamically as the nodes go
+ * online/offline.<br/>
+ * <code>
+ * HelixCustomCodeRunner runner = new HelixCustomCodeRunner(manager,ZK_ADDR);
+ * runner
+ *  .invoke(_callback)
+ *  .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
+ *  .usingLeaderStandbyModel("someUniqueId")
+ *  .run()
+ * </code>
+ *
+ * @author kgopalak
+ *
+ */
+public class HelixCustomCodeRunner
+{
+  private static final String LEADER_STANDBY = "LeaderStandby";
+  private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
+  private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
+
+  private CustomCodeCallbackHandler _callback;
+  private List<ChangeType> _notificationTypes;
+  private String _resourceName;
+  private final HelixManager _manager;
+  private final String _zkAddr;
+  private GenericLeaderStandbyStateModelFactory _stateModelFty;
+
+  /**
+   * Constructs a HelixCustomCodeRunner that will run exactly in one place
+   *
+   * @param manager
+   * @param zkAddr
+   */
+  public HelixCustomCodeRunner(HelixManager manager, String zkAddr)
+  {
+    _manager = manager;
+    _zkAddr = zkAddr;
+  }
+
+  /**
+   * callback to invoke when there is a change in cluster state specified by on(
+   * notificationTypes) This callback must be idempotent which means they should
+   * not depend on what changed instead simply read the cluster data and act on
+   * it.
+   *
+   * @param callback
+   * @return
+   */
+  public HelixCustomCodeRunner invoke(CustomCodeCallbackHandler callback)
+  {
+    _callback = callback;
+    return this;
+  }
+
+  /**
+   * ChangeTypes interested in, ParticipantLeaderCallback.callback method will
+   * be invoked on the
+   *
+   * @param notificationTypes
+   * @return
+   */
+  public HelixCustomCodeRunner on(ChangeType... notificationTypes)
+  {
+    _notificationTypes = Arrays.asList(notificationTypes);
+    return this;
+  }
+
+  public HelixCustomCodeRunner usingLeaderStandbyModel(String id)
+  {
+    _resourceName = PARTICIPANT_LEADER + "_" + id;
+    return this;
+  }
+
+  /**
+   * This method will be invoked when there is a change in any subscribed
+   * notificationTypes
+   *
+   * @throws Exception
+   */
+  public void start() throws Exception
+  {
+    if (_callback == null || _notificationTypes == null || _notificationTypes.size() == 0
+        || _resourceName == null)
+    {
+      throw new IllegalArgumentException("Require callback | notificationTypes | resourceName");
+    }
+
+    LOG.info("Register participantLeader on " + _notificationTypes + " using " + _resourceName);
+
+    _stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
+
+    StateMachineEngine stateMach = _manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
+    ZkClient zkClient = null;
+    try
+    {
+      // manually add ideal state for participant leader using LeaderStandby
+      // model
+
+      zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+      zkClient.setZkSerializer(new ZNRecordSerializer());
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      IdealState idealState = new IdealState(_resourceName);
+      idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+      idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
+      idealState.setNumPartitions(1);
+      idealState.setStateModelDefRef(LEADER_STANDBY);
+      idealState.setStateModelFactoryName(_resourceName);
+      List<String> prefList = new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE
+          .toString()));
+      idealState.getRecord().setListField(_resourceName + "_0", prefList);
+
+      List<String> idealStates = accessor.getChildNames(keyBuilder.idealStates());
+      while (idealStates == null || !idealStates.contains(_resourceName))
+      {
+        accessor.setProperty(keyBuilder.idealStates(_resourceName), idealState);
+        idealStates = accessor.getChildNames(keyBuilder.idealStates());
+      }
+
+      LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:"
+          + idealState);
+    } finally
+    {
+      if (zkClient != null && zkClient.getConnection() != null)
+
+      {
+        zkClient.close();
+      }
+    }
+
+  }
+
+  /**
+   * Stop customer code runner
+   */
+  public void stop()
+  {
+    LOG.info("Removing stateModelFactory for " + _resourceName);
+    _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
+        _resourceName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
new file mode 100644
index 0000000..64d278e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -0,0 +1,284 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelParser;
+import org.apache.log4j.Logger;
+
+
+public class HelixStateMachineEngine implements StateMachineEngine
+{
+  private static Logger logger = Logger.getLogger(HelixStateMachineEngine.class);
+
+  // StateModelName->FactoryName->StateModelFactory
+  private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
+  StateModelParser _stateModelParser;
+
+  private final HelixManager _manager;
+
+  private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
+
+  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName)
+  {
+    return getStateModelFactory(stateModelName,
+                                HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+  }
+
+  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+                                                                      String factoryName)
+  {
+    if (!_stateModelFactoryMap.containsKey(stateModelName))
+    {
+      return null;
+    }
+    return _stateModelFactoryMap.get(stateModelName).get(factoryName);
+  }
+
+  public HelixStateMachineEngine(HelixManager manager)
+  {
+    _stateModelParser = new StateModelParser();
+    _manager = manager;
+
+    _stateModelFactoryMap =
+        new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
+    _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+  }
+
+  @Override
+  public boolean registerStateModelFactory(String stateModelDef,
+                                           StateModelFactory<? extends StateModel> factory)
+  {
+    return registerStateModelFactory(stateModelDef,
+                                     factory,
+                                     HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+  }
+
+  @Override
+  public boolean registerStateModelFactory(String stateModelName,
+                                           StateModelFactory<? extends StateModel> factory,
+                                           String factoryName)
+  {
+    if (stateModelName == null || factory == null || factoryName == null)
+    {
+      throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
+    }
+
+    logger.info("Register state model factory for state model " + stateModelName
+        + " using factory name " + factoryName + " with " + factory);
+
+    if (!_stateModelFactoryMap.containsKey(stateModelName))
+    {
+      _stateModelFactoryMap.put(stateModelName,
+                                new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+    }
+
+    if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName))
+    {
+      logger.warn("stateModelFactory for " + stateModelName + " using factoryName "
+          + factoryName + " has already been registered.");
+      return false;
+    }
+
+    _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
+    sendNopMessage();
+    return true;
+  }
+
+  // TODO: duplicated code in DefaultMessagingService
+  private void sendNopMessage()
+  {
+    if (_manager.isConnected())
+    {
+      try
+      {
+        Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
+        nopMsg.setSrcName(_manager.getInstanceName());
+
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        if (_manager.getInstanceType() == InstanceType.CONTROLLER
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+        {
+          nopMsg.setTgtName("Controller");
+          accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
+        }
+
+        if (_manager.getInstanceType() == InstanceType.PARTICIPANT
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+        {
+          nopMsg.setTgtName(_manager.getInstanceName());
+          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
+                               nopMsg);
+        }
+        logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: "
+            + nopMsg.getId());
+      }
+      catch (Exception e)
+      {
+        logger.error(e);
+      }
+    }
+  }
+
+  @Override
+  public void reset()
+  {
+    for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap.values())
+    {
+      for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values())
+      {
+        Map<String, ? extends StateModel> modelMap = stateModelFactory.getStateModelMap();
+        if (modelMap == null || modelMap.isEmpty())
+        {
+          continue;
+        }
+
+        for (String resourceKey : modelMap.keySet())
+        {
+          StateModel stateModel = modelMap.get(resourceKey);
+          stateModel.reset();
+          String initialState = _stateModelParser.getInitialState(stateModel.getClass());
+          stateModel.updateState(initialState);
+          // TODO probably should update the state on ZK. Shi confirm what needs
+          // to be done here.
+        }
+      }
+    }
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message, NotificationContext context)
+  {
+    String type = message.getMsgType();
+
+    if (!type.equals(MessageType.STATE_TRANSITION.toString()))
+    {
+      throw new HelixException("Unexpected msg type for message " + message.getMsgId()
+          + " type:" + message.getMsgType());
+    }
+
+    String partitionKey = message.getPartitionName();
+    String stateModelName = message.getStateModelDef();
+    String resourceName = message.getResourceName();
+    String sessionId = message.getTgtSessionId();
+    int bucketSize = message.getBucketSize();
+
+    if (stateModelName == null)
+    {
+      logger.error("message does not contain stateModelDef");
+      return null;
+    }
+
+    String factoryName = message.getStateModelFactoryName();
+    if (factoryName == null)
+    {
+      factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
+    }
+
+    StateModelFactory stateModelFactory =
+        getStateModelFactory(stateModelName, factoryName);
+    if (stateModelFactory == null)
+    {
+      logger.warn("Cannot find stateModelFactory for model:" + stateModelName
+          + " using factoryName:" + factoryName + " for resourceGroup:" + resourceName);
+      return null;
+    }
+
+    // check if the state model definition exists and cache it
+    if (!_stateModelDefs.containsKey(stateModelName))
+    {
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      StateModelDefinition stateModelDef =
+          accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
+      if (stateModelDef == null)
+      {
+        throw new HelixException("stateModelDef for " + stateModelName
+            + " does NOT exists");
+      }
+      _stateModelDefs.put(stateModelName, stateModelDef);
+    }
+
+    // create currentStateDelta for this partition
+    String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
+    StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
+    if (stateModel == null)
+    {
+      stateModelFactory.createAndAddStateModel(partitionKey);
+      stateModel = stateModelFactory.getStateModel(partitionKey);
+      stateModel.updateState(initState);
+    }
+
+    CurrentState currentStateDelta = new CurrentState(resourceName);
+    currentStateDelta.setSessionId(sessionId);
+    currentStateDelta.setStateModelDefRef(stateModelName);
+    currentStateDelta.setStateModelFactoryName(factoryName);
+    currentStateDelta.setBucketSize(bucketSize);
+
+    currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
+        ? initState : stateModel.getCurrentState());
+
+    HelixTaskExecutor executor = (HelixTaskExecutor) context.get(NotificationContext.TASK_EXECUTOR_KEY);
+    
+    return new HelixStateTransitionHandler(stateModel,
+                                           message,
+                                           context,
+                                           currentStateDelta,
+                                           executor);
+  }
+
+  @Override
+  public String getMessageType()
+  {
+    return MessageType.STATE_TRANSITION.toString();
+  }
+
+  @Override
+  public boolean removeStateModelFactory(String stateModelDef,
+                                         StateModelFactory<? extends StateModel> factory)
+  {
+    throw new UnsupportedOperationException("Remove not yet supported");
+  }
+
+  @Override
+  public boolean removeStateModelFactory(String stateModelDef,
+                                         StateModelFactory<? extends StateModel> factory,
+                                         String factoryName)
+  {
+    throw new UnsupportedOperationException("Remove not yet supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
new file mode 100644
index 0000000..bf6227b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * Helix participant manager uses this class to register/remove state model factory
+ * State model factory creates state model that handles state transition messages
+ */
+public interface StateMachineEngine extends MessageHandlerFactory
+{
+  /**
+   * Register a default state model factory for a state model definition
+   * A state model definition could be, for example: 
+   * "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
+   * @param stateModelDef
+   * @param factory
+   * @return
+   */
+  public boolean registerStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory);
+
+  /**
+   * Register a state model factory with a name for a state model definition
+   * @param stateModelDef
+   * @param factory
+   * @param factoryName
+   * @return
+   */
+  public boolean registerStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory, String factoryName);
+
+  /**
+   * Remove the default state model factory for a state model definition
+   * @param stateModelDef
+   * @param factory
+   * @return
+   */
+  public boolean removeStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory);
+
+  /**
+   * Remove the state model factory with a name for a state model definition
+   * @param stateModelDef
+   * @param factory
+   * @param factoryName
+   * @return
+   */
+  public boolean removeStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory, String factoryName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/package-info.java b/helix-core/src/main/java/org/apache/helix/participant/package-info.java
new file mode 100644
index 0000000..66a0a56
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix implementation of participant classes 
+ * 
+ */
+package org.apache.helix.participant;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
new file mode 100644
index 0000000..ea50fd5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public abstract class StateModel
+{
+	static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+	Logger logger = Logger.getLogger(StateModel.class);
+	
+	// TODO Get default state from implementation or from state model annotation
+	// StateModel with initial state other than OFFLINE should override this field
+	protected String _currentState = DEFAULT_INITIAL_STATE;
+
+	public String getCurrentState()
+	{
+		return _currentState;
+	}
+
+	// @transition(from='from', to='to')
+	public void defaultTransitionHandler()
+	{
+		logger
+		    .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
+	}
+
+	public boolean updateState(String newState)
+	{
+		_currentState = newState;
+		return true;
+	}
+
+	/**
+	 * Called when error occurs in state transition
+	 * 
+	 * TODO:enforce subclass to write this
+	 * @param message
+	 * @param context
+	 * @param error
+	 */
+	public void rollbackOnError(Message message, NotificationContext context,
+	    StateTransitionError error)
+	{
+
+		logger.error("Default rollback method invoked on error. Error Code:"
+		    + error.getCode());
+
+	}
+
+	/**
+	 * Called when the state model is reset
+	 */
+	public void reset()
+	{
+    logger.warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
new file mode 100644
index 0000000..0e08dda
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class StateModelFactory<T extends StateModel>
+{
+  private ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
+
+  /**
+   * This method will be invoked only once per partitionName per session
+   * 
+   * @param partitionName
+   * @return
+   */
+  public abstract T createNewStateModel(String partitionName);
+
+  /**
+   * Add a state model for a partition
+   * 
+   * @param partitionName
+   * @return
+   */
+  public void addStateModel(String partitionName, T stateModel)
+  {
+    _stateModelMap.put(partitionName, stateModel);
+  }
+  
+  /**
+   * Create a state model for a partition
+   * 
+   * @param partitionName
+   */
+  public void createAndAddStateModel(String partitionName)
+  {
+    _stateModelMap.put(partitionName, createNewStateModel(partitionName));
+  }
+
+  /**
+   * Get the state model for a partition
+   * 
+   * @param partitionName
+   * @return
+   */
+  public T getStateModel(String partitionName)
+  {
+    return _stateModelMap.get(partitionName);
+  }
+
+  /**
+   * Get the state model map
+   * 
+   * @return
+   */
+  public Map<String, T> getStateModelMap()
+  {
+    return _stateModelMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelInfo.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelInfo.java
new file mode 100644
index 0000000..0850325
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelInfo.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface StateModelInfo
+{
+  String[] states();
+
+  String initialState();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
new file mode 100644
index 0000000..0413983
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Finds the method in stateModel to generate
+ * 
+ * @author kgopalak
+ * 
+ */
+public class StateModelParser
+{
+
+	public Method getMethodForTransition(Class<? extends StateModel> clazz,
+	    String fromState, String toState, Class<?>[] paramTypes)
+	{
+		Method method = getMethodForTransitionUsingAnnotation(clazz, fromState,
+		    toState, paramTypes);
+		if (method == null)
+		{
+			method = getMethodForTransitionByConvention(clazz, fromState, toState,
+			    paramTypes);
+		}
+		return method;
+	}
+
+	/**
+	 * This class uses the method naming convention "onBecome" + toState + "From"
+	 * + fromState;
+	 * 
+	 * @param clazz
+	 * @param fromState
+	 * @param toState
+	 * @param paramTypes
+	 * @return Method if found else null
+	 */
+	public Method getMethodForTransitionByConvention(
+	    Class<? extends StateModel> clazz, String fromState, String toState,
+	    Class<?>[] paramTypes)
+	{
+		Method methodToInvoke = null;
+		String methodName = "onBecome" + toState + "From" + fromState;
+		if (fromState.equals("*"))
+		{
+			methodName = "onBecome" + toState;
+		}
+
+		Method[] methods = clazz.getMethods();
+		for (Method method : methods)
+		{
+			if (method.getName().equalsIgnoreCase(methodName))
+			{
+				Class<?>[] parameterTypes = method.getParameterTypes();
+				if (parameterTypes.length == 2
+				    && parameterTypes[0].equals(Message.class)
+				    && parameterTypes[1].equals(NotificationContext.class))
+				{
+					methodToInvoke = method;
+					break;
+				}
+			}
+		}
+		return methodToInvoke;
+
+	}
+
+	/**
+	 * This method uses annotations on the StateModel class. Use StateModelInfo
+	 * annotation to specify valid states and initial value use Transition to
+	 * specify "to" and "from" state
+	 * 
+	 * @param clazz
+	 *          , class which extends StateModel
+	 * @param fromState
+	 * @param toState
+	 * @param paramTypes
+	 * @return
+	 */
+	public Method getMethodForTransitionUsingAnnotation(
+	    Class<? extends StateModel> clazz, String fromState, String toState,
+	    Class<?>[] paramTypes)
+	{
+		StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
+		Method methodToInvoke = null;
+		if (stateModelInfo != null)
+		{
+			Method[] methods = clazz.getMethods();
+			if (methods != null)
+			{
+				for (Method method : methods)
+				{
+					Transition annotation = method.getAnnotation(Transition.class);
+					if (annotation != null)
+					{
+						boolean matchesFrom = annotation.from().equalsIgnoreCase(fromState);
+						boolean matchesTo = annotation.to().equalsIgnoreCase(toState);
+						boolean matchesParamTypes = Arrays.equals(paramTypes,
+						    method.getParameterTypes());
+						if (matchesFrom && matchesTo && matchesParamTypes)
+						{
+							methodToInvoke = method;
+							break;
+						}
+					}
+				}
+			}
+		}
+
+		return methodToInvoke;
+	}
+
+	/**
+	 * Get the intial state for the state model
+	 * 
+	 * @param clazz
+	 * @return
+	 */
+	public String getInitialState(Class<? extends StateModel> clazz)
+	{
+		StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
+		if (stateModelInfo != null)
+		{
+			return stateModelInfo.initialState();
+		}else{
+			return StateModel.DEFAULT_INITIAL_STATE;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateTransitionError.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateTransitionError.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateTransitionError.java
new file mode 100644
index 0000000..e51b21d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateTransitionError.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+
+public class StateTransitionError
+{
+  private final Exception _exception;
+  private final ErrorCode _code;
+  private final ErrorType _type;
+
+  public StateTransitionError(ErrorType type, ErrorCode code, Exception e)
+  {
+    _type = type;
+    _code = code;
+    _exception = e;
+  }
+
+  public Exception getException()
+  {
+    return _exception;
+  }
+
+  public ErrorCode getCode()
+  {
+    return _code;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/Transition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/Transition.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/Transition.java
new file mode 100644
index 0000000..5c4831a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/Transition.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant.statemachine;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Transition
+{
+  String from();
+
+  String to();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/statemachine/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/package-info.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/package-info.java
new file mode 100644
index 0000000..69a51c7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix state model definitions for participant
+ * 
+ */
+package org.apache.helix.participant.statemachine;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
new file mode 100644
index 0000000..b6a9eff
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -0,0 +1,301 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.spectator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+
+public class RoutingTableProvider implements ExternalViewChangeListener, ConfigChangeListener
+{
+  private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
+  private final AtomicReference<RoutingTable> _routingTableRef;
+
+  public RoutingTableProvider()
+  {
+    _routingTableRef = new AtomicReference<RoutingTableProvider.RoutingTable>(new RoutingTable());
+
+  }
+
+  /**
+   * returns the instances for {resource,partition} pair that are in a specific
+   * {state}
+   * 
+   * @param resourceName
+   *          -
+   * @param partitionName
+   * @param state
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state)
+  {
+    List<InstanceConfig> instanceList = null;
+    RoutingTable _routingTable = _routingTableRef.get();
+    ResourceInfo resourceInfo = _routingTable.get(resourceName);
+    if (resourceInfo != null)
+    {
+      PartitionInfo keyInfo = resourceInfo.get(partitionName);
+      if (keyInfo != null)
+      {
+        instanceList = keyInfo.get(state);
+      }
+    }
+    if (instanceList == null)
+    {
+      instanceList = Collections.emptyList();
+    }
+    return instanceList;
+  }
+
+  /**
+   * returns all instances for {resource} that are in a specific {state}
+   * 
+   * @param resource
+   * @param state
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstances(String resource, String state)
+  {
+    Set<InstanceConfig> instanceSet = null;
+    RoutingTable routingTable = _routingTableRef.get();
+    ResourceInfo resourceInfo = routingTable.get(resource);
+    if (resourceInfo != null)
+    {
+      instanceSet = resourceInfo.getInstances(state);
+    }
+    if (instanceSet == null)
+    {
+      instanceSet = Collections.emptySet();
+    }
+    return instanceSet;
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext)
+  {
+    // session has expired clean up the routing table
+    if (changeContext.getType() == NotificationContext.Type.FINALIZE)
+    {
+      logger.info("Resetting the routing table. ");
+      RoutingTable newRoutingTable = new RoutingTable();
+      _routingTableRef.set(newRoutingTable);
+      return;
+    }
+    refresh(externalViewList, changeContext);
+  }
+
+  @Override
+  public void onConfigChange(List<InstanceConfig> configs,
+                             NotificationContext changeContext)
+  {
+    // session has expired clean up the routing table
+    if (changeContext.getType() == NotificationContext.Type.FINALIZE)
+    {
+      logger.info("Resetting the routing table. ");
+      RoutingTable newRoutingTable = new RoutingTable();
+      _routingTableRef.set(newRoutingTable);
+      return;
+    }
+    
+    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    List<ExternalView> externalViewList = accessor.getChildValues(keyBuilder.externalViews());
+    refresh(externalViewList, changeContext);    
+  }
+  
+  private void refresh(List<ExternalView> externalViewList, NotificationContext changeContext)
+  {
+    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    
+    List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+    for (InstanceConfig config : configList)
+    {
+      instanceConfigMap.put(config.getId(), config);
+    }
+    RoutingTable newRoutingTable = new RoutingTable();
+    if (externalViewList != null)
+    {
+      for (ExternalView extView : externalViewList)
+      {
+        String resourceName = extView.getId();
+        for (String partitionName : extView.getPartitionSet())
+        {
+          Map<String, String> stateMap = extView.getStateMap(partitionName);
+          for (String instanceName : stateMap.keySet())
+          {
+            String currentState = stateMap.get(instanceName);
+            if (instanceConfigMap.containsKey(instanceName))
+            {
+              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+              newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
+            } else
+            {
+              logger.error("Invalid instance name." + instanceName
+                  + " .Not found in /cluster/configs/. instanceName: ");
+            }
+
+          }
+        }
+      }
+    }
+    _routingTableRef.set(newRoutingTable);
+  }
+
+  class RoutingTable
+  {
+    private final HashMap<String, ResourceInfo> resourceInfoMap;
+
+    public RoutingTable()
+    {
+      resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+    }
+
+    public void addEntry(String resourceName, String partitionName, String state,
+        InstanceConfig config)
+    {
+      if (!resourceInfoMap.containsKey(resourceName))
+      {
+        resourceInfoMap.put(resourceName, new ResourceInfo());
+      }
+      ResourceInfo resourceInfo = resourceInfoMap.get(resourceName);
+      resourceInfo.addEntry(partitionName, state, config);
+
+    }
+
+    ResourceInfo get(String resourceName)
+    {
+      return resourceInfoMap.get(resourceName);
+    }
+
+  }
+
+  class ResourceInfo
+  {
+    // store PartitionInfo for each partition
+    HashMap<String, PartitionInfo> partitionInfoMap;
+    // stores the Set of Instances in a given state
+    HashMap<String, Set<InstanceConfig>> stateInfoMap;
+
+    public ResourceInfo()
+    {
+      partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>();
+      stateInfoMap = new HashMap<String, Set<InstanceConfig>>();
+    }
+
+    public void addEntry(String stateUnitKey, String state, InstanceConfig config)
+    {
+      // add
+      if (!stateInfoMap.containsKey(state))
+      {
+        Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() {
+
+          @Override
+          public int compare(InstanceConfig o1, InstanceConfig o2)
+          {
+            if (o1 == o2)
+            {
+              return 0;
+            }
+            if (o1 == null)
+            {
+              return -1;
+            }
+            if (o2 == null)
+            {
+              return 1;
+            }
+
+            int compareTo = o1.getHostName().compareTo(o2.getHostName());
+            if (compareTo == 0)
+            {
+              return o1.getPort().compareTo(o2.getPort());
+            } else
+            {
+              return compareTo;
+            }
+
+          }
+        };
+        stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator));
+      }
+      Set<InstanceConfig> set = stateInfoMap.get(state);
+      set.add(config);
+
+      if (!partitionInfoMap.containsKey(stateUnitKey))
+      {
+        partitionInfoMap.put(stateUnitKey, new PartitionInfo());
+      }
+      PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey);
+      stateUnitKeyInfo.addEntry(state, config);
+
+    }
+
+    public Set<InstanceConfig> getInstances(String state)
+    {
+      Set<InstanceConfig> instanceSet = stateInfoMap.get(state);
+      return instanceSet;
+    }
+
+    PartitionInfo get(String stateUnitKey)
+    {
+      return partitionInfoMap.get(stateUnitKey);
+    }
+  }
+
+  class PartitionInfo
+  {
+    HashMap<String, List<InstanceConfig>> stateInfoMap;
+
+    public PartitionInfo()
+    {
+      stateInfoMap = new HashMap<String, List<InstanceConfig>>();
+    }
+
+    public void addEntry(String state, InstanceConfig config)
+    {
+      if (!stateInfoMap.containsKey(state))
+      {
+        stateInfoMap.put(state, new ArrayList<InstanceConfig>());
+      }
+      List<InstanceConfig> list = stateInfoMap.get(state);
+      list.add(config);
+    }
+
+    List<InstanceConfig> get(String state)
+    {
+      return stateInfoMap.get(state);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/spectator/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/package-info.java b/helix-core/src/main/java/org/apache/helix/spectator/package-info.java
new file mode 100644
index 0000000..b64c1ce
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix default implementation of a cluster spectator
+ * 
+ */
+package org.apache.helix.spectator;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/HelixPropertyListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/HelixPropertyListener.java b/helix-core/src/main/java/org/apache/helix/store/HelixPropertyListener.java
new file mode 100644
index 0000000..9c15ce4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/HelixPropertyListener.java
@@ -0,0 +1,25 @@
+package org.apache.helix.store;
+
+public interface HelixPropertyListener
+{
+  /**
+   * Invoked on data change
+   * 
+   * @param path
+   */
+  void onDataChange(String path);
+
+  /**
+   * Invoked on data creation
+   * 
+   * @param path
+   */
+  void onDataCreate(String path);
+
+  /**
+   * Invoked on data deletion
+   * 
+   * @param path
+   */
+  void onDataDelete(String path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/HelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/HelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/HelixPropertyStore.java
new file mode 100644
index 0000000..a5c17f5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/HelixPropertyStore.java
@@ -0,0 +1,44 @@
+package org.apache.helix.store;
+
+import org.apache.helix.BaseDataAccessor;
+
+public interface HelixPropertyStore<T> extends BaseDataAccessor<T>
+{
+  /**
+   * Perform resource allocation when property store starts
+   * 
+   * Resource allocation includes: - start an internal thread for fire callbacks
+   * 
+   */
+  public void start();
+
+  /**
+   * Perform clean up when property store stops
+   * 
+   * Cleanup includes: - stop the internal thread for fire callbacks
+   * 
+   */
+  public void stop();
+
+  /**
+   * Register a listener to a parent path.
+   * 
+   * Subscribing to a parent path means any changes happening under the parent path will
+   * notify the listener
+   * 
+   * @param parentPath
+   * @param listener
+   */
+  public void subscribe(String parentPath, HelixPropertyListener listener);
+
+  /**
+   * Remove a listener from a parent path.
+   * 
+   * This will remove the listener from receiving any notifications happening under the
+   * parent path
+   * 
+   * @param parentPath
+   * @param listener
+   */
+  public void unsubscribe(String parentPath, HelixPropertyListener listener);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyChangeListener.java b/helix-core/src/main/java/org/apache/helix/store/PropertyChangeListener.java
new file mode 100644
index 0000000..2529b11
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyChangeListener.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+/**
+ * Callback interface on property changes
+ * @param <T>
+ */
+public interface PropertyChangeListener<T>
+{
+  /**
+   * Callback function when there is a change in any property that starts with key
+   * It's upto the implementation to handle the following different cases 1) key
+   * is a simple key and does not have any children. PropertyStore.getProperty(key) must
+   * be used to retrieve the value; 2) key is a prefix and has children.
+   * PropertyStore.getPropertyNames(key) must be used to retrieve all the children keys.
+   * Its important to know that PropertyStore will not be able to provide the
+   * delta[old value,new value] or which child was added/deleted. The
+   * implementation must take care of the fact that there might be callback for
+   * every child thats added/deleted. General way applications handle this is
+   * keep a local cache of keys and compare against the latest keys.
+   * 
+   * @param key
+   */
+  void onPropertyChange(String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyJsonComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyJsonComparator.java b/helix-core/src/main/java/org/apache/helix/store/PropertyJsonComparator.java
new file mode 100644
index 0000000..9a8a98b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyJsonComparator.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+public class PropertyJsonComparator<T> implements Comparator<T>
+{
+  static private Logger LOG = Logger.getLogger(PropertyJsonComparator.class);
+  private final PropertyJsonSerializer<T> _serializer;
+  
+  public PropertyJsonComparator(Class<T> clazz)
+  {
+    _serializer = new PropertyJsonSerializer<T>(clazz);
+  }
+
+  @Override
+  public int compare(T arg0, T arg1)
+  {
+    if (arg0 == null && arg1 == null)
+    {
+      return 0;
+    }
+    else if (arg0 == null && arg1 != null)
+    {
+      return -1;
+    }
+    else if (arg0 != null && arg1 == null)
+    {
+      return 1;
+    }
+    else
+    {
+      try
+      {
+        String s0 = new String(_serializer.serialize(arg0));
+        String s1 = new String(_serializer.serialize(arg1));
+
+        return s0.compareTo(s1);
+      }
+      catch (PropertyStoreException e)
+      {
+        // e.printStackTrace();
+        LOG.warn(e.getMessage());
+        return -1;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyJsonSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyJsonSerializer.java b/helix-core/src/main/java/org/apache/helix/store/PropertyJsonSerializer.java
new file mode 100644
index 0000000..40bd876
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyJsonSerializer.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class PropertyJsonSerializer<T> implements PropertySerializer<T>
+{
+  static private Logger LOG = Logger.getLogger(PropertyJsonSerializer.class);
+  private final Class<T> _clazz;
+
+  public PropertyJsonSerializer(Class<T> clazz)
+  {
+    _clazz = clazz;
+  }
+
+  @Override
+  public byte[] serialize(T data) throws PropertyStoreException
+  {
+    ObjectMapper mapper = new ObjectMapper();
+
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS,
+                            true);
+    StringWriter sw = new StringWriter();
+
+    try
+    {
+      mapper.writeValue(sw, data);
+
+      if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
+      {
+        throw new HelixException("Data size larger than 1M. Write empty string to zk.");
+      }
+      return sw.toString().getBytes();
+
+    }
+    catch (Exception e)
+    {
+      LOG.error("Error during serialization of data (first 1k): "
+          + sw.toString().substring(0, 1024), e);
+    }
+
+    return new byte[] {};
+  }
+
+  @Override
+  public T deserialize(byte[] bytes) throws PropertyStoreException
+  {
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
+                              true);
+    try
+    {
+      T value = mapper.readValue(bais, _clazz);
+      return value;
+    }
+    catch (Exception e)
+    {
+      LOG.error("Error during deserialization of bytes: " + new String(bytes), e);
+    }
+
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertySerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertySerializer.java b/helix-core/src/main/java/org/apache/helix/store/PropertySerializer.java
new file mode 100644
index 0000000..450be27
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertySerializer.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+/**
+ * Serializer interface for property store 
+ * @param <T>
+ */
+    
+public interface PropertySerializer<T>
+{
+  /**
+   * Serialize data object of type T to byte array
+   * @param data
+   * @return
+   * @throws PropertyStoreException
+   */
+  public byte[] serialize(T data) throws PropertyStoreException;
+
+  /**
+   * Deserialize byte array to data object of type T
+   * @param bytes
+   * @return
+   * @throws PropertyStoreException
+   */
+  public T deserialize(byte[] bytes) throws PropertyStoreException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyStat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyStat.java b/helix-core/src/main/java/org/apache/helix/store/PropertyStat.java
new file mode 100644
index 0000000..2119b4f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyStat.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+public class PropertyStat
+{
+  private long _lastModifiedTime; // time in milliseconds from epoch when this property
+                                  // was last modified
+  private int _version; // latest version number
+
+  public PropertyStat()
+  {
+    this(0, 0);
+  }
+
+  public PropertyStat(long lastModifiedTime, int version)
+  {
+    _lastModifiedTime = lastModifiedTime;
+    _version = version;
+  }
+
+  public long getLastModifiedTime()
+  {
+    return _lastModifiedTime;
+  }
+
+  public int getVersion()
+  {
+    return _version;
+  }
+
+  public void setLastModifiedTime(long lastModifiedTime)
+  {
+
+    _lastModifiedTime = lastModifiedTime;
+  }
+
+  public void setVersion(int version)
+  {
+    _version = version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/PropertyStore.java
new file mode 100644
index 0000000..98a15f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyStore.java
@@ -0,0 +1,212 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+import java.util.Comparator;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+
+/**
+ * This property store is similar to a key value store but supports hierarchical
+ * structure. It also provides notifications when there is a change in child or
+ * parent. Data can be stored child only. Property name cannot end with a "/". 
+ * Only the root "/" is an exception. 
+ * Property key name is split based on delimiter "/".
+ * Suppose we do setProperty("/foo/bar1",val1) and setProperty("/foo/bar2",val2)
+ * getProperty("/foo) will return null since no data is stored at /foo
+ * getPropertyNames("/foo") will return "bar1" and "bar2" 
+ * removeProperty("/foo/bar1") will simply remove the property stored at /foo/bar1
+ * 
+ * @author kgopalak
+ * @param <T>
+ */
+public interface PropertyStore<T>
+{
+  /**
+   * Set property on key. Override if the property already exists
+   * @param key
+   * @param value
+   * @throws PropertyStoreException
+   */
+  void setProperty(String key, T value) throws PropertyStoreException;
+
+  /**
+   * Get the property on key
+   * @param key
+   * @return value of the property
+   * @throws PropertyStoreException
+   */
+  T getProperty(String key) throws PropertyStoreException;
+  
+  /**
+   * Get the property and its statistics information
+   * @param key
+   * @param stat
+   * @return value of the property
+   * @throws PropertyStoreException
+   */
+  T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException;
+
+  /**
+   * Removes the property on key
+   * @param key
+   * @throws PropertyStoreException
+   */
+  void removeProperty(String key) throws PropertyStoreException;
+
+  /**
+   * Get all the child property keys under prefix
+   * @param prefix
+   * @return
+   * @throws PropertyStoreException
+   */
+  List<String> getPropertyNames(String prefix) throws PropertyStoreException;
+
+  /**
+   * Delimiter to split the propertyName
+   * @param delimiter
+   * @throws PropertyStoreException
+   */
+  void setPropertyDelimiter(String delimiter) throws PropertyStoreException;
+
+  /**
+   * Subscribe for changes in the property property. Key can be a prefix,
+   * Multiple callbacks can be invoked. One callback per change is not guaranteed.
+   * Some changes might be missed. That's why one has to read the data on every
+   * callback.
+   * @param prefix
+   * @param listener
+   * @throws PropertyStoreException
+   */
+  void subscribeForPropertyChange(String prefix,
+      PropertyChangeListener<T> listener) throws PropertyStoreException;
+
+  /**
+   * Removes the subscription for the prefix
+   * @param prefix
+   * @param listener
+   * @throws PropertyStoreException
+   */
+  void unsubscribeForPropertyChange(String prefix,
+      PropertyChangeListener<T> listener) throws PropertyStoreException;
+
+  /**
+   * Indicates if the implementation supports the feature of storing data in
+   * parent
+   * @return
+   */
+  boolean canParentStoreData();
+
+  /**
+   * Set property serializer
+   * @param serializer
+   */
+  void setPropertySerializer(PropertySerializer<T> serializer);
+
+  /**
+   * create a sub namespace in the property store
+   * @return root property path
+   */ 
+  public void createPropertyNamespace(String prefix) throws PropertyStoreException;
+    
+  /**
+   * Get the root namespace of the property store
+   * @return root property path
+   */ 
+  public String getPropertyRootNamespace();
+  
+  /**
+   * Atomically update property until succeed, updater updates old value to new value
+   * Will create key if doesn't exist
+   * @param key
+   * @param updater
+   */
+  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater) 
+    throws PropertyStoreException;
+  
+  /**
+   * Atomically update property until succeed, updater updates old value to new value
+   * If createIfAbsent is true, will create key if doesn't exist
+   * If createIfAbsent is false, will not create key and throw exception
+   * @param key
+   * @param updater
+   * @param createIfAbsent
+   * @throws PropertyStoreException
+   */
+  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater, boolean createIfAbsent) 
+    throws PropertyStoreException;
+  
+  /**
+   * Check if a property exists
+   * @param key
+   * @return
+   */
+  public boolean exists(String key);
+  
+  /**
+   * Remove a parent and all its descendants
+   * @param prefix
+   * @throws PropertyStoreException
+   */
+  public void removeNamespace(String prefix) throws PropertyStoreException;
+  
+  /**
+   * Update property return true if succeed, false otherwise
+   * @param key
+   * @param updater
+   */
+  // public boolean updateProperty(String key, DataUpdater<T> updater);
+  
+  /**
+   * Atomically compare and set property
+   * Return true if succeed, false otherwise
+   * Create if doesn't exist
+   * @param key
+   * @param expected value
+   * @param updated value
+   * @param comparator
+   * @param create if absent
+   */
+  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator);
+  
+  /**
+   * Atomically compare and set property
+   * Return true if succeed, false otherwise
+   * If createIfAbsent is true, create key if doesn't exist
+   * If createIfAbsent is false, will not create key and throw exception
+   * @param key
+   * @param expected
+   * @param update
+   * @param comparator
+   * @param createIfAbsent
+   * @return
+   */
+  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator, boolean createIfAbsent);
+
+  /**
+   * Start property store
+   * @return
+   */
+  public boolean start();
+
+  /**
+   * Stop property store and do clean up
+   * @return true
+   */
+  public boolean stop();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyStoreException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyStoreException.java b/helix-core/src/main/java/org/apache/helix/store/PropertyStoreException.java
new file mode 100644
index 0000000..e2deae1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyStoreException.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+/**
+ * This exception class can be used to indicate any exception during operation
+ * on the propertystore
+ * 
+ * @author kgopalak
+ * 
+ */
+public class PropertyStoreException extends Exception
+{
+  public PropertyStoreException(String msg)
+  {
+    super(msg);
+  }
+
+  public PropertyStoreException()
+  {
+    super();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/PropertyStoreFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/PropertyStoreFactory.java b/helix-core/src/main/java/org/apache/helix/store/PropertyStoreFactory.java
new file mode 100644
index 0000000..1747748
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/PropertyStoreFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+
+
+public class PropertyStoreFactory
+{
+  private static Logger LOG = Logger.getLogger(PropertyStoreFactory.class);
+
+  public static <T extends Object> PropertyStore<T> getZKPropertyStore(String zkAddress,
+                                                                       PropertySerializer<T> serializer,
+                                                                       String rootNamespace)
+  {
+    if (zkAddress == null || serializer == null || rootNamespace == null)
+    {
+      throw new IllegalArgumentException("zkAddress|serializer|rootNamespace can't be null");
+    }
+
+    LOG.info("Get a zk property store. zkAddr: " + zkAddress + ", root: " + rootNamespace);
+    ZkClient zkClient =
+        new ZkClient(zkAddress,
+                     ZkClient.DEFAULT_SESSION_TIMEOUT,
+                     ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                     new ByteArraySerializer());
+    return new ZKPropertyStore<T>(zkClient, serializer, rootNamespace);
+  }
+
+  public static <T extends Object> PropertyStore<T> getFilePropertyStore(PropertySerializer<T> serializer,
+                                                                         String rootNamespace,
+                                                                         PropertyJsonComparator<T> comparator)
+  {
+    if (comparator == null || serializer == null || rootNamespace == null)
+    {
+      throw new IllegalArgumentException("arguments can't be null");
+    }
+
+    LOG.info("Get a file property store. root: " + rootNamespace);
+    FilePropertyStore<T> store =
+        new FilePropertyStore<T>(serializer, rootNamespace, comparator);
+    return store;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/ZNRecordJsonSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/ZNRecordJsonSerializer.java b/helix-core/src/main/java/org/apache/helix/store/ZNRecordJsonSerializer.java
new file mode 100644
index 0000000..22146c6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/ZNRecordJsonSerializer.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.log4j.Logger;
+
+
+public class ZNRecordJsonSerializer implements PropertySerializer<ZNRecord>
+{
+  static private Logger LOG = Logger.getLogger(ZNRecordJsonSerializer.class);
+  private final ZNRecordSerializer _serializer = new ZNRecordSerializer();
+  
+  @Override
+  public byte[] serialize(ZNRecord data) throws PropertyStoreException
+  {
+    return _serializer.serialize(data);
+  }
+
+  @Override
+  public ZNRecord deserialize(byte[] bytes) throws PropertyStoreException
+  {
+    return (ZNRecord) _serializer.deserialize(bytes);
+  }
+
+}