You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:57 UTC

[49/53] [abbrv] [HELIX-259] add HelixConnection, rb=14728

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
new file mode 100644
index 0000000..751aefa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
@@ -0,0 +1,166 @@
+package org.apache.helix.monitoring;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.store.ZNRecordJsonSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class StatusDumpTask extends HelixTimerTask {
+  final static Logger LOG = Logger.getLogger(StatusDumpTask.class);
+
+  Timer _timer = null;
+  final HelixDataAccessor _accessor;
+  final ClusterId _clusterId;
+
+  class StatusDumpTimerTask extends TimerTask {
+    final HelixDataAccessor _accessor;
+    final PropertyKey.Builder _keyBuilder;
+    final BaseDataAccessor<ZNRecord> _baseAccessor;
+    final ZNRecordJsonSerializer _serializer;
+    final long _thresholdNoChangeInMs;
+    final ClusterId _clusterId;
+
+    public StatusDumpTimerTask(ClusterId clusterId, HelixDataAccessor accessor,
+        long thresholdNoChangeInMs) {
+      _accessor = accessor;
+      _keyBuilder = accessor.keyBuilder();
+      _baseAccessor = accessor.getBaseDataAccessor();
+      _serializer = new ZNRecordJsonSerializer();
+      _thresholdNoChangeInMs = thresholdNoChangeInMs;
+      _clusterId = clusterId;
+    }
+
+    @Override
+    public void run() {
+      /**
+       * For each record in status-update and error znode
+       * TODO: for now the status updates are dumped to cluster controller's log.
+       * We need to think if we should create per-instance log files that contains
+       * per-instance status-updates and errors
+       */
+      LOG.info("Scannning status updates ...");
+      try {
+        List<String> instanceNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+        for (String instanceName : instanceNames) {
+
+          scanPath(_keyBuilder.statusUpdates(instanceName).getPath());
+          scanPath(HelixUtil.getInstancePropertyPath(_clusterId.stringify(), instanceName,
+              PropertyType.ERRORS));
+        }
+
+        scanPath(HelixUtil.getControllerPropertyPath(_clusterId.stringify(),
+            PropertyType.STATUSUPDATES_CONTROLLER));
+        scanPath(HelixUtil.getControllerPropertyPath(_clusterId.stringify(),
+            PropertyType.ERRORS_CONTROLLER));
+      } catch (Exception e) {
+        LOG.error("Exception dumping status/errors, clusterId: " + _clusterId, e);
+      }
+    }
+
+    // TODO: refactor this
+    void scanPath(String path) {
+      LOG.info("Scannning path: " + path);
+      List<String> childs = _baseAccessor.getChildNames(path, 0);
+      if (childs == null || childs.isEmpty()) {
+        return;
+      }
+
+      for (String child : childs) {
+        String childPath = path + "/" + child;
+
+        try {
+          List<String> grandChilds = _baseAccessor.getChildNames(childPath, 0);
+          if (grandChilds == null || grandChilds.isEmpty()) {
+            continue;
+          }
+
+          for (String grandChild : grandChilds) {
+            String grandChildPath = childPath + "/" + grandChild;
+            try {
+              checkAndDump(grandChildPath);
+            } catch (Exception e) {
+              LOG.error("Exception in dumping status, path: " + grandChildPath, e);
+            }
+          }
+        } catch (Exception e) {
+          LOG.error("Exception in dumping status, path: " + childPath, e);
+        }
+      }
+    }
+
+    void checkAndDump(String path) {
+      List<String> paths = new ArrayList<String>();
+      paths.add(path);
+
+      List<String> childs = _baseAccessor.getChildNames(path, 0);
+      if (childs != null && !childs.isEmpty()) {
+        for (String child : childs) {
+          String childPath = path + "/" + child;
+          paths.add(childPath);
+        }
+      }
+
+      long nowInMs = System.currentTimeMillis();
+
+      List<Stat> stats = new ArrayList<Stat>();
+      List<ZNRecord> records = _baseAccessor.get(paths, stats, 0);
+      for (int i = 0; i < paths.size(); i++) {
+        String dumpPath = paths.get(i);
+        Stat stat = stats.get(i);
+        ZNRecord record = records.get(i);
+        long timePassedInMs = nowInMs - stat.getMtime();
+        if (timePassedInMs > _thresholdNoChangeInMs) {
+          LOG.info("Dumping status update path: " + dumpPath + ", " + timePassedInMs
+              + "MS has passed");
+          try {
+            LOG.info(new String(_serializer.serialize(record)));
+          } catch (Exception e) {
+            LOG.warn("Ignorable exception serializing path: " + dumpPath + ", record: " + record, e);
+          }
+          _baseAccessor.remove(dumpPath, 0);
+        }
+      }
+    }
+  }
+
+  public StatusDumpTask(ClusterId clusterId, HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _clusterId = clusterId;
+  }
+
+  @Override
+  public void start() {
+    final long initialDelay = 30 * 60 * 1000;
+    final long period = 120 * 60 * 1000;
+    final long thresholdNoChangeInMs = 180 * 60 * 1000;
+
+    if (_timer == null) {
+      LOG.info("Start StatusDumpTask");
+      _timer = new Timer("StatusDumpTimerTask", true);
+      _timer.scheduleAtFixedRate(new StatusDumpTimerTask(_clusterId, _accessor,
+          thresholdNoChangeInMs), initialDelay, period);
+    }
+
+  }
+
+  @Override
+  public void stop() {
+    if (_timer != null) {
+      LOG.info("Stop StatusDumpTask");
+      _timer.cancel();
+      _timer = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 4e4fdf6..95afb70 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -46,15 +46,20 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
+import org.apache.helix.participant.statemachine.HelixStateModelFactoryAdaptor;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.log4j.Logger;
 
 public class HelixStateMachineEngine implements StateMachineEngine {
-  private static Logger logger = Logger.getLogger(HelixStateMachineEngine.class);
+  private static Logger LOG = Logger.getLogger(HelixStateMachineEngine.class);
 
-  // StateModelName->FactoryName->StateModelFactory
+  /**
+   * Map of StateModelDefId to map of FactoryName to StateModelFactory
+   * TODO change to use StateModelDefId and HelixStateModelFactory
+   */
   private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
   private final StateModelParser _stateModelParser;
   private final HelixManager _manager;
@@ -95,7 +100,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
       throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
     }
 
-    logger.info("Register state model factory for state model " + stateModelName
+    LOG.info("Register state model factory for state model " + stateModelName
         + " using factory name " + factoryName + " with " + factory);
 
     if (!_stateModelFactoryMap.containsKey(stateModelName)) {
@@ -104,7 +109,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     }
 
     if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) {
-      logger.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
+      LOG.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
           + " has already been registered.");
       return false;
     }
@@ -136,9 +141,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
           nopMsg.setTgtName(_manager.getInstanceName());
           accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
         }
-        logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " + nopMsg.getId());
+        LOG.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " + nopMsg.getId());
       } catch (Exception e) {
-        logger.error(e);
+        LOG.error(e);
       }
     }
   }
@@ -176,9 +181,8 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     int bucketSize = message.getBucketSize();
 
     if (stateModelId == null) {
-      logger
-          .error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
-              + message.getId());
+      LOG.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
+          + message.getId());
       return null;
     }
 
@@ -190,7 +194,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     StateModelFactory<? extends StateModel> stateModelFactory =
         getStateModelFactory(stateModelId.stringify(), factoryName);
     if (stateModelFactory == null) {
-      logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+      LOG.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
           + stateModelId + " using factoryName: " + factoryName + " for resource: " + resourceId);
       return null;
     }
@@ -241,7 +245,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
       // get executor-service for the message
       TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
       if (executor == null) {
-        logger.error("fail to get executor-service for batch message: " + message.getId()
+        LOG.error("fail to get executor-service for batch message: " + message.getId()
             + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceId());
         return null;
       }
@@ -265,4 +269,86 @@ public class HelixStateMachineEngine implements StateMachineEngine {
       StateModelFactory<? extends StateModel> factory, String factoryName) {
     throw new UnsupportedOperationException("Remove not yet supported");
   }
+
+  @Override
+  public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
+      HelixStateModelFactory<? extends StateModel> factory) {
+    return registerStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY,
+        factory);
+  }
+
+  @Override
+  public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
+      HelixStateModelFactory<? extends StateModel> factory) {
+    if (stateModelDefId == null || factoryName == null || factory == null) {
+      LOG.info("stateModelDefId|factoryName|stateModelFactory is null");
+      return false;
+    }
+
+    LOG.info("Registering state model factory for state-model-definition: " + stateModelDefId
+        + " using factory-name: " + factoryName + " with: " + factory);
+
+    StateModelFactory<? extends StateModel> factoryAdaptor =
+        new HelixStateModelFactoryAdaptor(factory);
+
+    String stateModelDefName = stateModelDefId.stringify();
+    if (!_stateModelFactoryMap.containsKey(stateModelDefName)) {
+      _stateModelFactoryMap.put(stateModelDefName,
+          new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+    }
+
+    if (_stateModelFactoryMap.get(stateModelDefName).containsKey(factoryName)) {
+      LOG.info("Skip register state model factory for " + stateModelDefId + " using factory-name "
+          + factoryName + ", since it has already been registered.");
+      return false;
+    }
+
+    _stateModelFactoryMap.get(stateModelDefName).put(factoryName, factoryAdaptor);
+
+    sendNopMessage();
+    return true;
+  }
+
+  @Override
+  public boolean removeStateModelFactory(StateModelDefId stateModelDefId) {
+    return removeStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+  }
+
+  @Override
+  public boolean removeStateModelFactory(StateModelDefId stateModelDefId, String factoryName) {
+    if (stateModelDefId == null || factoryName == null) {
+      LOG.info("stateModelDefId|factoryName is null");
+      return false;
+    }
+
+    LOG.info("Removing state model factory for state-model-definition: " + stateModelDefId
+        + " using factory-name: " + factoryName);
+
+    String stateModelDefName = stateModelDefId.stringify();
+    Map<String, StateModelFactory<? extends StateModel>> ftyMap =
+        _stateModelFactoryMap.get(stateModelDefName);
+    if (ftyMap == null) {
+      LOG.info("Skip remove state model factory " + stateModelDefId + ", since it does NOT exist");
+      return false;
+    }
+
+    StateModelFactory<? extends StateModel> fty = ftyMap.remove(factoryName);
+    if (fty == null) {
+      LOG.info("Skip remove state model factory " + stateModelDefId + " using factory-name "
+          + factoryName + ", since it does NOT exist");
+      return false;
+    }
+
+    if (ftyMap.isEmpty()) {
+      _stateModelFactoryMap.remove(stateModelDefName);
+    }
+
+    for (String partition : fty.getPartitionSet()) {
+      StateModel stateModel = fty.getStateModel(partition);
+      stateModel.reset();
+      // TODO probably should remove the state from zookeeper
+    }
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index d11b3cc..80c9545 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -19,52 +19,84 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
 /**
- * Helix participant manager uses this class to register/remove state model factory
+ * Helix participant uses this class to register/remove state model factory
  * State model factory creates state model that handles state transition messages
  */
 public interface StateMachineEngine extends MessageHandlerFactory {
+
+  /**
+   * Replaced by {@link #registerStateModelFactory(StateModelDefId, HelixStateModelFactory)
+   */
+  @Deprecated
+  public boolean registerStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory);
+
+  /**
+   * Replaced by {@link #registerStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
+   */
+  @Deprecated
+  public boolean registerStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory, String factoryName);
+
+  /**
+   * Replaced by {@link #removeStateModelFactory(StateModelDefId, HelixStateModelFactory)}
+   */
+  @Deprecated
+  public boolean removeStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory);
+
+  /**
+   * Replaced by {@link #removeStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
+   */
+  @Deprecated
+  public boolean removeStateModelFactory(String stateModelDef,
+      StateModelFactory<? extends StateModel> factory, String factoryName);
+
   /**
    * Register a default state model factory for a state model definition
    * A state model definition could be, for example:
    * "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
-   * @param stateModelDef
+   * Replacing {@link #registerStateModelFactory(String, StateModelFactory)}
+   * @param stateModelDefId
    * @param factory
    * @return
    */
-  public boolean registerStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory);
+  public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
+      HelixStateModelFactory<? extends StateModel> factory);
 
   /**
-   * Register a state model factory with a name for a state model definition
-   * @param stateModelDef
-   * @param factory
+   * Register a state model factory with a factory name for a state model definition
+   * Replacing {@link #registerStateModelFactory(String, StateModelFactory, String)}
+   * @param stateModelDefId
    * @param factoryName
+   * @param factory
    * @return
    */
-  public boolean registerStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory, String factoryName);
+  public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
+      HelixStateModelFactory<? extends StateModel> factory);
 
   /**
    * Remove the default state model factory for a state model definition
-   * @param stateModelDef
-   * @param factory
+   * Replacing {@link #removeStateModelFactory(String, StateModelFactory)
+   * @param stateModelDefId
    * @return
    */
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory);
+  public boolean removeStateModelFactory(StateModelDefId stateModelDefId);
 
   /**
    * Remove the state model factory with a name for a state model definition
-   * @param stateModelDef
-   * @param factory
+   * Replacing {@link #removeStateModelFactory(String, StateModelFactory, String)}
+   * @param stateModelDefId
    * @param factoryName
    * @return
    */
-  public boolean removeStateModelFactory(String stateModelDef,
-      StateModelFactory<? extends StateModel> factory, String factoryName);
+  public boolean removeStateModelFactory(StateModelDefId stateModelDefId, String factoryName);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
new file mode 100644
index 0000000..770bba4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
@@ -0,0 +1,99 @@
+package org.apache.helix.participant.statemachine;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.messaging.handling.BatchMessageWrapper;
+
+/**
+ * State model factory that uses concrete id classes instead of strings.
+ * Replacing {@link org.apache.helix.participant.statemachine.StateModelFactory}
+ */
+public abstract class HelixStateModelFactory<T extends StateModel> {
+  /**
+   * map from partitionId to stateModel
+   */
+  private final ConcurrentMap<PartitionId, T> _stateModelMap =
+      new ConcurrentHashMap<PartitionId, T>();
+
+  /**
+   * map from resourceName to BatchMessageWrapper
+   */
+  private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
+      new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
+
+  /**
+   * This method will be invoked only once per partition per session
+   * @param partitionId
+   * @return
+   */
+  public abstract T createNewStateModel(PartitionId partitionId);
+
+  /**
+   * Create a state model for a partition
+   * @param partitionId
+   */
+  public T createAndAddStateModel(PartitionId partitionId) {
+    T stateModel = createNewStateModel(partitionId);
+    _stateModelMap.put(partitionId, stateModel);
+    return stateModel;
+  }
+
+  /**
+   * Get the state model for a partition
+   * @param partitionId
+   * @return state model if exists, null otherwise
+   */
+  public T getStateModel(PartitionId partitionId) {
+    return _stateModelMap.get(partitionId);
+  }
+
+  /**
+   * remove state model for a partition
+   * @param partitionId
+   * @return state model removed or null if not exist
+   */
+  public T removeStateModel(PartitionId partitionId) {
+    return _stateModelMap.remove(partitionId);
+  }
+
+  /**
+   * get partition set
+   * @return partitionId set
+   */
+  public Set<PartitionId> getPartitionSet() {
+    return _stateModelMap.keySet();
+  }
+
+  /**
+   * create a default batch-message-wrapper for a resource
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
+    return new BatchMessageWrapper();
+  }
+
+  /**
+   * create a batch-message-wrapper for a resource and put it into map
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
+    BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
+    _batchMsgWrapperMap.put(resourceId, wrapper);
+    return wrapper;
+  }
+
+  /**
+   * get batch-message-wrapper for a resource
+   * @param resourceId
+   * @return
+   */
+  public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
+    return _batchMsgWrapperMap.get(resourceId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
new file mode 100644
index 0000000..a17d9f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
@@ -0,0 +1,17 @@
+package org.apache.helix.participant.statemachine;
+
+import org.apache.helix.api.id.PartitionId;
+
+public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
+  final HelixStateModelFactory<T> _factory;
+
+  public HelixStateModelFactoryAdaptor(HelixStateModelFactory<T> factory) {
+    _factory = factory;
+  }
+
+  @Override
+  public T createNewStateModel(String partitionName) {
+    return _factory.createNewStateModel(PartitionId.from(partitionName));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 4d8e598..a74f67b 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -25,6 +25,10 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
 
+/**
+ * Replaced by {@link org.apache.helix.participant.statemachine.HelixStateModelFactory}
+ */
+@Deprecated
 public abstract class StateModelFactory<T extends StateModel> {
   /**
    * mapping from partitionName to StateModel

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
new file mode 100644
index 0000000..6f148cc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixConnection extends ZkUnitTestBase {
+  private static final Logger LOG = Logger.getLogger(TestHelixConnection.class.getName());
+
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "OFFLINE", "ERROR"
+  })
+  public static class MockStateModel extends StateModel {
+    public MockStateModel() {
+
+    }
+
+    @Transition(to = "*", from = "*")
+    public void onBecomeAnyFromAny(Message message, NotificationContext context) {
+      String from = message.getFromState();
+      String to = message.getToState();
+      LOG.info("Become " + to + " from " + from);
+    }
+  }
+
+  public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+
+    public MockStateModelFactory() {
+    }
+
+    @Override
+    public MockStateModel createNewStateModel(PartitionId partitionId) {
+      MockStateModel model = new MockStateModel();
+
+      return model;
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    String zkAddr = ZK_ADDR;
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ControllerId controllerId = ControllerId.from("controller");
+    final ParticipantId participantId = ParticipantId.from("participant1");
+
+    ResourceId resourceId = ResourceId.from("testDB");
+    State master = State.from("MASTER");
+    State slave = State.from("SLAVE");
+    State offline = State.from("OFFLINE");
+    StateModelDefId stateModelDefId = StateModelDefId.from("MasterSlave");
+
+    // create connection
+    HelixConnection connection = new ZkHelixConnection(zkAddr);
+    connection.connect();
+
+    // setup cluster
+    ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+    clusterAccessor.dropCluster();
+
+    StateModelDefinition stateModelDef =
+        new StateModelDefinition.Builder(stateModelDefId).addState(master, 1).addState(slave, 2)
+            .addState(offline, 3).addTransition(offline, slave, 3).addTransition(slave, offline, 4)
+            .addTransition(slave, master, 2).addTransition(master, slave, 1).initialState(offline)
+            .upperBound(master, 1).dynamicUpperBound(slave, "R").build();
+    RebalancerContext rebalancerCtx =
+        new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+            .stateModelDefId(stateModelDefId)
+            .preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
+    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+        stateModelDef).build());
+    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+        rebalancerCtx).build());
+    clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
+
+    // start controller
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.startAsync();
+
+    // start participant
+    HelixParticipant participant = connection.createParticipant(clusterId, participantId);
+    participant.getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
+
+    participant.startAsync();
+
+    // verify
+    final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        ExternalView externalView = accessor.getProperty(keyBuilder.externalView("testDB"));
+        Map<ParticipantId, State> stateMap = externalView.getStateMap(PartitionId.from("testDB_0"));
+
+        if (stateMap == null || !stateMap.containsKey(participantId)) {
+          return false;
+        }
+
+        return stateMap.get(participantId).equals(State.from("MASTER"));
+      }
+    }, 10 * 1000);
+
+    Assert.assertTrue(success);
+
+    // clean up
+    controller.stopAsync();
+    participant.stopAsync();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}