You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:57 UTC
[49/53] [abbrv] [HELIX-259] add HelixConnection, rb=14728
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
new file mode 100644
index 0000000..751aefa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
@@ -0,0 +1,166 @@
+package org.apache.helix.monitoring;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.store.ZNRecordJsonSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class StatusDumpTask extends HelixTimerTask {
+ final static Logger LOG = Logger.getLogger(StatusDumpTask.class);
+
+ Timer _timer = null;
+ final HelixDataAccessor _accessor;
+ final ClusterId _clusterId;
+
+ class StatusDumpTimerTask extends TimerTask {
+ final HelixDataAccessor _accessor;
+ final PropertyKey.Builder _keyBuilder;
+ final BaseDataAccessor<ZNRecord> _baseAccessor;
+ final ZNRecordJsonSerializer _serializer;
+ final long _thresholdNoChangeInMs;
+ final ClusterId _clusterId;
+
+ public StatusDumpTimerTask(ClusterId clusterId, HelixDataAccessor accessor,
+ long thresholdNoChangeInMs) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ _baseAccessor = accessor.getBaseDataAccessor();
+ _serializer = new ZNRecordJsonSerializer();
+ _thresholdNoChangeInMs = thresholdNoChangeInMs;
+ _clusterId = clusterId;
+ }
+
+ @Override
+ public void run() {
+ /**
+ * For each record in status-update and error znode
+ * TODO: for now the status updates are dumped to cluster controller's log.
+ * We need to think if we should create per-instance log files that contains
+ * per-instance status-updates and errors
+ */
+ LOG.info("Scannning status updates ...");
+ try {
+ List<String> instanceNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+ for (String instanceName : instanceNames) {
+
+ scanPath(_keyBuilder.statusUpdates(instanceName).getPath());
+ scanPath(HelixUtil.getInstancePropertyPath(_clusterId.stringify(), instanceName,
+ PropertyType.ERRORS));
+ }
+
+ scanPath(HelixUtil.getControllerPropertyPath(_clusterId.stringify(),
+ PropertyType.STATUSUPDATES_CONTROLLER));
+ scanPath(HelixUtil.getControllerPropertyPath(_clusterId.stringify(),
+ PropertyType.ERRORS_CONTROLLER));
+ } catch (Exception e) {
+ LOG.error("Exception dumping status/errors, clusterId: " + _clusterId, e);
+ }
+ }
+
+ // TODO: refactor this
+ void scanPath(String path) {
+ LOG.info("Scannning path: " + path);
+ List<String> childs = _baseAccessor.getChildNames(path, 0);
+ if (childs == null || childs.isEmpty()) {
+ return;
+ }
+
+ for (String child : childs) {
+ String childPath = path + "/" + child;
+
+ try {
+ List<String> grandChilds = _baseAccessor.getChildNames(childPath, 0);
+ if (grandChilds == null || grandChilds.isEmpty()) {
+ continue;
+ }
+
+ for (String grandChild : grandChilds) {
+ String grandChildPath = childPath + "/" + grandChild;
+ try {
+ checkAndDump(grandChildPath);
+ } catch (Exception e) {
+ LOG.error("Exception in dumping status, path: " + grandChildPath, e);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in dumping status, path: " + childPath, e);
+ }
+ }
+ }
+
+ void checkAndDump(String path) {
+ List<String> paths = new ArrayList<String>();
+ paths.add(path);
+
+ List<String> childs = _baseAccessor.getChildNames(path, 0);
+ if (childs != null && !childs.isEmpty()) {
+ for (String child : childs) {
+ String childPath = path + "/" + child;
+ paths.add(childPath);
+ }
+ }
+
+ long nowInMs = System.currentTimeMillis();
+
+ List<Stat> stats = new ArrayList<Stat>();
+ List<ZNRecord> records = _baseAccessor.get(paths, stats, 0);
+ for (int i = 0; i < paths.size(); i++) {
+ String dumpPath = paths.get(i);
+ Stat stat = stats.get(i);
+ ZNRecord record = records.get(i);
+ long timePassedInMs = nowInMs - stat.getMtime();
+ if (timePassedInMs > _thresholdNoChangeInMs) {
+ LOG.info("Dumping status update path: " + dumpPath + ", " + timePassedInMs
+ + "MS has passed");
+ try {
+ LOG.info(new String(_serializer.serialize(record)));
+ } catch (Exception e) {
+ LOG.warn("Ignorable exception serializing path: " + dumpPath + ", record: " + record, e);
+ }
+ _baseAccessor.remove(dumpPath, 0);
+ }
+ }
+ }
+ }
+
+ public StatusDumpTask(ClusterId clusterId, HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _clusterId = clusterId;
+ }
+
+ @Override
+ public void start() {
+ final long initialDelay = 30 * 60 * 1000;
+ final long period = 120 * 60 * 1000;
+ final long thresholdNoChangeInMs = 180 * 60 * 1000;
+
+ if (_timer == null) {
+ LOG.info("Start StatusDumpTask");
+ _timer = new Timer("StatusDumpTimerTask", true);
+ _timer.scheduleAtFixedRate(new StatusDumpTimerTask(_clusterId, _accessor,
+ thresholdNoChangeInMs), initialDelay, period);
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ if (_timer != null) {
+ LOG.info("Stop StatusDumpTask");
+ _timer.cancel();
+ _timer = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 4e4fdf6..95afb70 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -46,15 +46,20 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
+import org.apache.helix.participant.statemachine.HelixStateModelFactoryAdaptor;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.log4j.Logger;
public class HelixStateMachineEngine implements StateMachineEngine {
- private static Logger logger = Logger.getLogger(HelixStateMachineEngine.class);
+ private static Logger LOG = Logger.getLogger(HelixStateMachineEngine.class);
- // StateModelName->FactoryName->StateModelFactory
+ /**
+ * Map of StateModelDefId to map of FactoryName to StateModelFactory
+ * TODO change to use StateModelDefId and HelixStateModelFactory
+ */
private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
private final StateModelParser _stateModelParser;
private final HelixManager _manager;
@@ -95,7 +100,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
}
- logger.info("Register state model factory for state model " + stateModelName
+ LOG.info("Register state model factory for state model " + stateModelName
+ " using factory name " + factoryName + " with " + factory);
if (!_stateModelFactoryMap.containsKey(stateModelName)) {
@@ -104,7 +109,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) {
- logger.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
+ LOG.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
+ " has already been registered.");
return false;
}
@@ -136,9 +141,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
nopMsg.setTgtName(_manager.getInstanceName());
accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
}
- logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " + nopMsg.getId());
+ LOG.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " + nopMsg.getId());
} catch (Exception e) {
- logger.error(e);
+ LOG.error(e);
}
}
}
@@ -176,9 +181,8 @@ public class HelixStateMachineEngine implements StateMachineEngine {
int bucketSize = message.getBucketSize();
if (stateModelId == null) {
- logger
- .error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
- + message.getId());
+ LOG.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
+ + message.getId());
return null;
}
@@ -190,7 +194,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
StateModelFactory<? extends StateModel> stateModelFactory =
getStateModelFactory(stateModelId.stringify(), factoryName);
if (stateModelFactory == null) {
- logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ LOG.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ stateModelId + " using factoryName: " + factoryName + " for resource: " + resourceId);
return null;
}
@@ -241,7 +245,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
// get executor-service for the message
TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
if (executor == null) {
- logger.error("fail to get executor-service for batch message: " + message.getId()
+ LOG.error("fail to get executor-service for batch message: " + message.getId()
+ ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceId());
return null;
}
@@ -265,4 +269,86 @@ public class HelixStateMachineEngine implements StateMachineEngine {
StateModelFactory<? extends StateModel> factory, String factoryName) {
throw new UnsupportedOperationException("Remove not yet supported");
}
+
+ @Override
+ public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
+ HelixStateModelFactory<? extends StateModel> factory) {
+ return registerStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY,
+ factory);
+ }
+
+ @Override
+ public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
+ HelixStateModelFactory<? extends StateModel> factory) {
+ if (stateModelDefId == null || factoryName == null || factory == null) {
+ LOG.info("stateModelDefId|factoryName|stateModelFactory is null");
+ return false;
+ }
+
+ LOG.info("Registering state model factory for state-model-definition: " + stateModelDefId
+ + " using factory-name: " + factoryName + " with: " + factory);
+
+ StateModelFactory<? extends StateModel> factoryAdaptor =
+ new HelixStateModelFactoryAdaptor(factory);
+
+ String stateModelDefName = stateModelDefId.stringify();
+ if (!_stateModelFactoryMap.containsKey(stateModelDefName)) {
+ _stateModelFactoryMap.put(stateModelDefName,
+ new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+ }
+
+ if (_stateModelFactoryMap.get(stateModelDefName).containsKey(factoryName)) {
+ LOG.info("Skip register state model factory for " + stateModelDefId + " using factory-name "
+ + factoryName + ", since it has already been registered.");
+ return false;
+ }
+
+ _stateModelFactoryMap.get(stateModelDefName).put(factoryName, factoryAdaptor);
+
+ sendNopMessage();
+ return true;
+ }
+
+ @Override
+ public boolean removeStateModelFactory(StateModelDefId stateModelDefId) {
+ return removeStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ }
+
+ @Override
+ public boolean removeStateModelFactory(StateModelDefId stateModelDefId, String factoryName) {
+ if (stateModelDefId == null || factoryName == null) {
+ LOG.info("stateModelDefId|factoryName is null");
+ return false;
+ }
+
+ LOG.info("Removing state model factory for state-model-definition: " + stateModelDefId
+ + " using factory-name: " + factoryName);
+
+ String stateModelDefName = stateModelDefId.stringify();
+ Map<String, StateModelFactory<? extends StateModel>> ftyMap =
+ _stateModelFactoryMap.get(stateModelDefName);
+ if (ftyMap == null) {
+ LOG.info("Skip remove state model factory " + stateModelDefId + ", since it does NOT exist");
+ return false;
+ }
+
+ StateModelFactory<? extends StateModel> fty = ftyMap.remove(factoryName);
+ if (fty == null) {
+ LOG.info("Skip remove state model factory " + stateModelDefId + " using factory-name "
+ + factoryName + ", since it does NOT exist");
+ return false;
+ }
+
+ if (ftyMap.isEmpty()) {
+ _stateModelFactoryMap.remove(stateModelDefName);
+ }
+
+ for (String partition : fty.getPartitionSet()) {
+ StateModel stateModel = fty.getStateModel(partition);
+ stateModel.reset();
+ // TODO probably should remove the state from zookeeper
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index d11b3cc..80c9545 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -19,52 +19,84 @@ package org.apache.helix.participant;
* under the License.
*/
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
/**
- * Helix participant manager uses this class to register/remove state model factory
+ * Helix participant uses this class to register/remove state model factory
* State model factory creates state model that handles state transition messages
*/
public interface StateMachineEngine extends MessageHandlerFactory {
+
+ /**
+ * Replaced by {@link #registerStateModelFactory(StateModelDefId, HelixStateModelFactory)
+ */
+ @Deprecated
+ public boolean registerStateModelFactory(String stateModelDef,
+ StateModelFactory<? extends StateModel> factory);
+
+ /**
+ * Replaced by {@link #registerStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
+ */
+ @Deprecated
+ public boolean registerStateModelFactory(String stateModelDef,
+ StateModelFactory<? extends StateModel> factory, String factoryName);
+
+ /**
+ * Replaced by {@link #removeStateModelFactory(StateModelDefId, HelixStateModelFactory)}
+ */
+ @Deprecated
+ public boolean removeStateModelFactory(String stateModelDef,
+ StateModelFactory<? extends StateModel> factory);
+
+ /**
+ * Replaced by {@link #removeStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
+ */
+ @Deprecated
+ public boolean removeStateModelFactory(String stateModelDef,
+ StateModelFactory<? extends StateModel> factory, String factoryName);
+
/**
* Register a default state model factory for a state model definition
* A state model definition could be, for example:
* "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
- * @param stateModelDef
+ * Replacing {@link #registerStateModelFactory(String, StateModelFactory)}
+ * @param stateModelDefId
* @param factory
* @return
*/
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
+ public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
+ HelixStateModelFactory<? extends StateModel> factory);
/**
- * Register a state model factory with a name for a state model definition
- * @param stateModelDef
- * @param factory
+ * Register a state model factory with a factory name for a state model definition
+ * Replacing {@link #registerStateModelFactory(String, StateModelFactory, String)}
+ * @param stateModelDefId
* @param factoryName
+ * @param factory
* @return
*/
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
+ public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
+ HelixStateModelFactory<? extends StateModel> factory);
/**
* Remove the default state model factory for a state model definition
- * @param stateModelDef
- * @param factory
+ * Replacing {@link #removeStateModelFactory(String, StateModelFactory)
+ * @param stateModelDefId
* @return
*/
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
+ public boolean removeStateModelFactory(StateModelDefId stateModelDefId);
/**
* Remove the state model factory with a name for a state model definition
- * @param stateModelDef
- * @param factory
+ * Replacing {@link #removeStateModelFactory(String, StateModelFactory, String)}
+ * @param stateModelDefId
* @param factoryName
* @return
*/
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
+ public boolean removeStateModelFactory(StateModelDefId stateModelDefId, String factoryName);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
new file mode 100644
index 0000000..770bba4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
@@ -0,0 +1,99 @@
+package org.apache.helix.participant.statemachine;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.messaging.handling.BatchMessageWrapper;
+
+/**
+ * State model factory that uses concrete id classes instead of strings.
+ * Replacing {@link org.apache.helix.participant.statemachine.StateModelFactory}
+ */
+public abstract class HelixStateModelFactory<T extends StateModel> {
+ /**
+ * map from partitionId to stateModel
+ */
+ private final ConcurrentMap<PartitionId, T> _stateModelMap =
+ new ConcurrentHashMap<PartitionId, T>();
+
+ /**
+ * map from resourceName to BatchMessageWrapper
+ */
+ private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
+ new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
+
+ /**
+ * This method will be invoked only once per partition per session
+ * @param partitionId
+ * @return
+ */
+ public abstract T createNewStateModel(PartitionId partitionId);
+
+ /**
+ * Create a state model for a partition
+ * @param partitionId
+ */
+ public T createAndAddStateModel(PartitionId partitionId) {
+ T stateModel = createNewStateModel(partitionId);
+ _stateModelMap.put(partitionId, stateModel);
+ return stateModel;
+ }
+
+ /**
+ * Get the state model for a partition
+ * @param partitionId
+ * @return state model if exists, null otherwise
+ */
+ public T getStateModel(PartitionId partitionId) {
+ return _stateModelMap.get(partitionId);
+ }
+
+ /**
+ * remove state model for a partition
+ * @param partitionId
+ * @return state model removed or null if not exist
+ */
+ public T removeStateModel(PartitionId partitionId) {
+ return _stateModelMap.remove(partitionId);
+ }
+
+ /**
+ * get partition set
+ * @return partitionId set
+ */
+ public Set<PartitionId> getPartitionSet() {
+ return _stateModelMap.keySet();
+ }
+
+ /**
+ * create a default batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
+ return new BatchMessageWrapper();
+ }
+
+ /**
+ * create a batch-message-wrapper for a resource and put it into map
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
+ BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
+ _batchMsgWrapperMap.put(resourceId, wrapper);
+ return wrapper;
+ }
+
+ /**
+ * get batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
+ return _batchMsgWrapperMap.get(resourceId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
new file mode 100644
index 0000000..a17d9f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
@@ -0,0 +1,17 @@
+package org.apache.helix.participant.statemachine;
+
+import org.apache.helix.api.id.PartitionId;
+
+public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
+ final HelixStateModelFactory<T> _factory;
+
+ public HelixStateModelFactoryAdaptor(HelixStateModelFactory<T> factory) {
+ _factory = factory;
+ }
+
+ @Override
+ public T createNewStateModel(String partitionName) {
+ return _factory.createNewStateModel(PartitionId.from(partitionName));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 4d8e598..a74f67b 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -25,6 +25,10 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.helix.messaging.handling.BatchMessageWrapper;
+/**
+ * Replaced by {@link org.apache.helix.participant.statemachine.HelixStateModelFactory}
+ */
+@Deprecated
public abstract class StateModelFactory<T extends StateModel> {
/**
* mapping from partitionName to StateModel
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
new file mode 100644
index 0000000..6f148cc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.statemachine.HelixStateModelFactory;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixConnection extends ZkUnitTestBase {
+ private static final Logger LOG = Logger.getLogger(TestHelixConnection.class.getName());
+
+ @StateModelInfo(initialState = "OFFLINE", states = {
+ "MASTER", "SLAVE", "OFFLINE", "ERROR"
+ })
+ public static class MockStateModel extends StateModel {
+ public MockStateModel() {
+
+ }
+
+ @Transition(to = "*", from = "*")
+ public void onBecomeAnyFromAny(Message message, NotificationContext context) {
+ String from = message.getFromState();
+ String to = message.getToState();
+ LOG.info("Become " + to + " from " + from);
+ }
+ }
+
+ public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+
+ public MockStateModelFactory() {
+ }
+
+ @Override
+ public MockStateModel createNewStateModel(PartitionId partitionId) {
+ MockStateModel model = new MockStateModel();
+
+ return model;
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ String zkAddr = ZK_ADDR;
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ControllerId controllerId = ControllerId.from("controller");
+ final ParticipantId participantId = ParticipantId.from("participant1");
+
+ ResourceId resourceId = ResourceId.from("testDB");
+ State master = State.from("MASTER");
+ State slave = State.from("SLAVE");
+ State offline = State.from("OFFLINE");
+ StateModelDefId stateModelDefId = StateModelDefId.from("MasterSlave");
+
+ // create connection
+ HelixConnection connection = new ZkHelixConnection(zkAddr);
+ connection.connect();
+
+ // setup cluster
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ clusterAccessor.dropCluster();
+
+ StateModelDefinition stateModelDef =
+ new StateModelDefinition.Builder(stateModelDefId).addState(master, 1).addState(slave, 2)
+ .addState(offline, 3).addTransition(offline, slave, 3).addTransition(slave, offline, 4)
+ .addTransition(slave, master, 2).addTransition(master, slave, 1).initialState(offline)
+ .upperBound(master, 1).dynamicUpperBound(slave, "R").build();
+ RebalancerContext rebalancerCtx =
+ new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+ .stateModelDefId(stateModelDefId)
+ .preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
+ clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+ stateModelDef).build());
+ clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+ rebalancerCtx).build());
+ clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
+
+ // start controller
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.startAsync();
+
+ // start participant
+ HelixParticipant participant = connection.createParticipant(clusterId, participantId);
+ participant.getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
+
+ participant.startAsync();
+
+ // verify
+ final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ ExternalView externalView = accessor.getProperty(keyBuilder.externalView("testDB"));
+ Map<ParticipantId, State> stateMap = externalView.getStateMap(PartitionId.from("testDB_0"));
+
+ if (stateMap == null || !stateMap.containsKey(participantId)) {
+ return false;
+ }
+
+ return stateMap.get(participantId).equals(State.from("MASTER"));
+ }
+ }, 10 * 1000);
+
+ Assert.assertTrue(success);
+
+ // clean up
+ controller.stopAsync();
+ participant.stopAsync();
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}