You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/11/19 20:31:21 UTC
helix git commit: [HELIX-552] StateModelFactory#_stateModelMap should
use both resourceName and partitionKey to map a state model, rb=28211
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 45a9fd3a9 -> 9ddd0af34
[HELIX-552] StateModelFactory#_stateModelMap should use both resourceName and partitionKey to map a state model, rb=28211
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9ddd0af3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9ddd0af3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9ddd0af3
Branch: refs/heads/helix-0.6.x
Commit: 9ddd0af347037f585a52b83bbc3a6b11a3934c82
Parents: 45a9fd3
Author: zzhang <zz...@apache.org>
Authored: Wed Nov 19 11:31:01 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Wed Nov 19 11:31:01 2014 -0800
----------------------------------------------------------------------
.../helix/agent/AgentStateModelFactory.java | 2 +-
.../main/java/org/apache/helix/PropertyKey.java | 9 ++
.../apache/helix/examples/BootstrapHandler.java | 2 +-
.../apache/helix/examples/DummyParticipant.java | 2 +-
.../LeaderStandbyStateModelFactory.java | 2 +-
.../examples/MasterSlaveStateModelFactory.java | 2 +-
.../OnlineOfflineStateModelFactory.java | 2 +-
.../handling/HelixStateTransitionHandler.java | 2 +-
.../DistClusterControllerStateModelFactory.java | 2 +-
.../GenericLeaderStandbyStateModelFactory.java | 2 +-
.../participant/HelixStateMachineEngine.java | 20 +--
.../statemachine/ScheduledTaskStateModel.java | 33 ++---
.../ScheduledTaskStateModelFactory.java | 6 +-
.../statemachine/StateModelFactory.java | 57 ++++++---
.../helix/task/TaskStateModelFactory.java | 2 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 2 +-
.../org/apache/helix/TestHelixTaskHandler.java | 2 +-
.../TestCorrectnessOnConnectivityLoss.java | 2 +-
.../integration/TestEntropyFreeNodeBounce.java | 2 +-
.../helix/integration/TestMessageThrottle2.java | 2 +-
.../TestPartitionLevelTransitionConstraint.java | 2 +-
.../integration/TestPreferenceListAsQueue.java | 2 +-
.../TestResourceWithSamePartitionKey.java | 126 +++++++++++++++++++
.../integration/TestStateTransitionTimeout.java | 5 +-
.../helix/integration/TestZkReconnect.java | 2 +-
.../integration/manager/TestStateModelLeak.java | 14 +--
.../helix/mock/participant/DummyProcess.java | 6 +-
.../participant/MockBootstrapModelFactory.java | 2 +-
.../mock/participant/MockMSModelFactory.java | 10 +-
.../participant/MockSchemataModelFactory.java | 2 +-
.../TestDistControllerStateModelFactory.java | 7 +-
.../apache/helix/lockmanager/LockFactory.java | 2 +-
.../rabbitmq/ConsumerStateModelFactory.java | 2 +-
.../filestore/FileStoreStateModelFactory.java | 2 +-
.../taskexecution/TaskStateModelFactory.java | 2 +-
35 files changed, 250 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index a0e00a3..f13dd05 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
public class AgentStateModelFactory extends StateModelFactory<AgentStateModel> {
@Override
- public AgentStateModel createNewStateModel(String partitionKey) {
+ public AgentStateModel createNewStateModel(String resourceName, String partitionKey) {
AgentStateModel model = new AgentStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index a394b50..a5a3561 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -306,6 +306,15 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with {@link Error} for an instance
+ * @param instanceName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey errors(String instanceName) {
+ return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
+ }
+
+ /**
* Get a property key associated with a specific {@link Message} on an instance
* @param instanceName
* @param messageId
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
index f1b37fc..65e68a4 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
@@ -37,7 +37,7 @@ import org.apache.helix.participant.statemachine.Transition;
public class BootstrapHandler extends StateModelFactory<StateModel> {
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
return new BootstrapStateModel(stateUnitKey);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
index 52a3696..ea13951 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
@@ -87,7 +87,7 @@ public class DummyParticipant {
// dummy master slave state model factory
public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel> {
@Override
- public DummyMSStateModel createNewStateModel(String partitionName) {
+ public DummyMSStateModel createNewStateModel(String resourceName, String partitionName) {
DummyMSStateModel model = new DummyMSStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
index 43ac5de..c0fff7c 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
@@ -32,7 +32,7 @@ public class LeaderStandbyStateModelFactory extends StateModelFactory<StateModel
}
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
stateModel.setDelay(_delay);
return stateModel;
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
index affbea8..85d90b2 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
@@ -44,7 +44,7 @@ public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel>
}
@Override
- public StateModel createNewStateModel(String partitionName) {
+ public StateModel createNewStateModel(String resourceName, String partitionName) {
MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
stateModel.setInstanceName(_instanceName);
stateModel.setDelay(_delay);
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
index daf03a9..e11c6d1 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
@@ -32,7 +32,7 @@ public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel
}
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
stateModel.setDelay(_delay);
return stateModel;
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 8da7ec9..e9c4f48 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -195,7 +195,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
- _stateModelFactory.removeStateModel(partitionKey);
+ _stateModelFactory.removeStateModel(resource, partitionKey);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
_stateModel.updateState(toState);
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
index a367c81..c172168 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -30,7 +30,7 @@ public class DistClusterControllerStateModelFactory extends
}
@Override
- public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
+ public DistClusterControllerStateModel createNewStateModel(String resourceName, String partitionKey) {
return new DistClusterControllerStateModel(_zkAddr);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
index 51c91cc..f679cd9 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -39,7 +39,7 @@ public class GenericLeaderStandbyStateModelFactory extends
}
@Override
- public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
+ public GenericLeaderStandbyModel createNewStateModel(String resourceName, String partitionKey) {
return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 31fcecf..039d076 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -142,13 +142,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
.values()) {
for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
- for (String resourceKey : stateModelFactory.getPartitionSet()) {
- StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
- stateModel.reset();
- String initialState = _stateModelParser.getInitialState(stateModel.getClass());
- stateModel.updateState(initialState);
- // TODO probably should update the state on ZK. Shi confirm what needs
- // to be done here.
+ for (String resourceName : stateModelFactory.getResourceSet()) {
+ for (String partitionKey : stateModelFactory.getPartitionSet(resourceName)) {
+ StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
+ stateModel.reset();
+ String initialState = _stateModelParser.getInitialState(stateModel.getClass());
+ stateModel.updateState(initialState);
+ // TODO probably should update the state on ZK. Shi confirm what needs
+ // to be done here.
+ }
}
}
}
@@ -206,9 +208,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
if (message.getBatchMessageMode() == false) {
// create currentStateDelta for this partition
String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
- StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
+ StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
if (stateModel == null) {
- stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
+ stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
stateModel.updateState(initState);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index 8b6a02c..6c7013c 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -37,26 +37,28 @@ public class ScheduledTaskStateModel extends StateModel {
// StateModel with initial state other than OFFLINE should override this field
protected String _currentState = DEFAULT_INITIAL_STATE;
final ScheduledTaskStateModelFactory _factory;
- final String _partitionName;
+ final String _resourceName;
+ final String _partitionKey;
final HelixTaskExecutor _executor;
public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
- HelixTaskExecutor executor, String partitionName) {
+ HelixTaskExecutor executor, String resourceName, String partitionKey) {
_factory = factory;
- _partitionName = partitionName;
+ _resourceName = resourceName;
+ _partitionKey = partitionKey;
_executor = executor;
}
@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeCompletedFromOffline");
+ logger.info(_partitionKey + " onBecomeCompletedFromOffline");
// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
- ZNRecord record = new ZNRecord(_partitionName);
+ ZNRecord record = new ZNRecord(_partitionKey);
record.getSimpleFields().putAll(messageInfo);
Message taskMessage = new Message(record);
if (logger.isDebugEnabled()) {
@@ -66,49 +68,50 @@ public class ScheduledTaskStateModel extends StateModel {
_executor.createMessageHandler(taskMessage, new NotificationContext(null));
if (handler == null) {
throw new HelixException("Task message " + taskMessage.getMsgType()
- + " handler not found, task id " + _partitionName);
+ + " handler not found, task id " + _partitionKey);
}
// Invoke the internal handler to complete the task
handler.handleMessage();
- logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+ logger.info(_partitionKey + " onBecomeCompletedFromOffline completed");
}
@Transition(to = "OFFLINE", from = "COMPLETED")
public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+ logger.info(_partitionKey + " onBecomeOfflineFromCompleted");
}
@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+ logger.info(_partitionKey + " onBecomeDroppedFromCompleted");
removeFromStatemodelFactory();
}
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+ logger.info(_partitionKey + " onBecomeDroppedFromScheduled");
removeFromStatemodelFactory();
}
@Transition(to = "OFFLINE", from = "ERROR")
public void onBecomeOfflineFromError(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeOfflineFromError");
+ logger.info(_partitionKey + " onBecomeOfflineFromError");
}
@Override
public void reset() {
- logger.info(_partitionName + " ScheduledTask reset");
+ logger.info(_partitionKey + " ScheduledTask reset");
removeFromStatemodelFactory();
}
// We need this to prevent state model leak
private void removeFromStatemodelFactory() {
- if (_factory.getStateModel(_partitionName) != null) {
- _factory.removeStateModel(_partitionName);
+ if (_factory.getStateModel(_resourceName, _partitionKey) != null) {
+ _factory.removeStateModel(_resourceName, _partitionKey);
} else {
- logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+ logger.warn(_resourceName + "_ " + _partitionKey
+ + " not found in ScheduledTaskStateModelFactory");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
index a205910..dce5898 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -32,8 +32,8 @@ public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledT
}
@Override
- public ScheduledTaskStateModel createNewStateModel(String partitionName) {
- logger.info("Create state model for ScheduledTask " + partitionName);
- return new ScheduledTaskStateModel(this, _executor, partitionName);
+ public ScheduledTaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+ logger.info("Create state model for ScheduledTask " + resourceName + "_" + partitionKey);
+ return new ScheduledTaskStateModel(this, _executor, resourceName, partitionKey);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index bd1a668..3a6c6d5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant.statemachine;
* under the License.
*/
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,9 +29,10 @@ import org.apache.helix.messaging.handling.BatchMessageWrapper;
public abstract class StateModelFactory<T extends StateModel> {
/**
- * mapping from partitionName to StateModel
+ * mapping resourceName to map of partitionName to StateModel
*/
- private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
+ private final ConcurrentMap<String, ConcurrentMap<String, T>> _stateModelMap =
+ new ConcurrentHashMap<String, ConcurrentMap<String, T>>();
/**
* mapping from resourceName to BatchMessageWrapper
@@ -39,49 +41,68 @@ public abstract class StateModelFactory<T extends StateModel> {
new ConcurrentHashMap<String, BatchMessageWrapper>();
/**
- * This method will be invoked only once per partitionName per session
+ * This method will be invoked only once per resource per partition per session
+ * Replacing old StateModelFactory#createNewStateModel(String partitionName)
+ * Add "resourceName" to signature @see HELIX-552
+ * @param resourceName
* @param partitionName
- * @return
+ * @return state model
*/
- public abstract T createNewStateModel(String partitionName);
+ public abstract T createNewStateModel(String resourceName, String partitionName);
/**
* Create a state model for a partition
- * @param partitionName
+ * @param partitionKey
*/
- public T createAndAddStateModel(String partitionName) {
- T stateModel = createNewStateModel(partitionName);
- _stateModelMap.put(partitionName, stateModel);
+ public T createAndAddStateModel(String resourceName, String partitionKey) {
+ T stateModel = createNewStateModel(resourceName, partitionKey);
+ _stateModelMap.putIfAbsent(resourceName, new ConcurrentHashMap<String, T>());
+ _stateModelMap.get(resourceName).put(partitionKey, stateModel);
return stateModel;
}
/**
* Get the state model for a partition
- * @param partitionName
+ * @param resourceName
+ * @param partitionKey
* @return state model if exists, null otherwise
*/
- public T getStateModel(String partitionName) {
- return _stateModelMap.get(partitionName);
+ public T getStateModel(String resourceName, String partitionKey) {
+ Map<String, T> map = _stateModelMap.get(resourceName);
+ return map == null? null : map.get(partitionKey);
}
/**
* remove state model for a partition
- * @param partitionName
+ * @param resourceName
+ * @param partitionKey
* @return state model removed or null if not exist
*/
- public T removeStateModel(String partitionName) {
- return _stateModelMap.remove(partitionName);
+ public T removeStateModel(String resourceName, String partitionKey) {
+ Map<String, T> map = _stateModelMap.get(resourceName);
+ return map == null? null : map.remove(partitionKey);
}
/**
- * get partition set
- * @return partition key set
+ * get resource set
+ * @param resourceName
+ * @return resource name set
*/
- public Set<String> getPartitionSet() {
+ public Set<String> getResourceSet() {
return _stateModelMap.keySet();
}
/**
+ * get partition set for a resource
+ * @param resourceName
+ * @return partition key set
+ */
+ public Set<String> getPartitionSet(String resourceName) {
+ Map<String, T> map = _stateModelMap.get(resourceName);
+ return (map == null? Collections.<String>emptySet() : map.keySet());
+ }
+
+ /**
* create a default batch-message-wrapper for a resource
* @param resourceName
* @return
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 51e8c95..b8e91f5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -37,7 +37,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
}
@Override
- public TaskStateModel createNewStateModel(String partitionName) {
+ public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
return new TaskStateModel(_manager, _taskFactoryRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index 4aa16eb..c62cefd 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -72,7 +72,7 @@ public class TestHelixTaskExecutor {
StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {
@Override
- public MockStateModel createNewStateModel(String partitionName) {
+ public MockStateModel createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockStateModel();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 3c7f2af..69f24d7 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -116,7 +116,7 @@ public class TestHelixTaskHandler {
new StateModelFactory<MockStateModelAnnotated>() {
@Override
- public MockStateModelAnnotated createNewStateModel(String partitionName) {
+ public MockStateModelAnnotated createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockStateModelAnnotated();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index 3b44f2c..cbe231d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -196,7 +196,7 @@ public class TestCorrectnessOnConnectivityLoss {
}
@Override
- public MyStateModel createNewStateModel(String partitionId) {
+ public MyStateModel createNewStateModel(String resource, String partitionId) {
return new MyStateModel(_counts);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
index 8a3d727..dc38369 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
@@ -152,7 +152,7 @@ public class TestEntropyFreeNodeBounce extends ZkUnitTestBase {
private static class MockStateModelFactory extends StateModelFactory<MockStateModel> {
@Override
- public MockStateModel createNewStateModel(String partitionName) {
+ public MockStateModel createNewStateModel(String resource, String partitionName) {
return new MockStateModel();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index 7d66780..49222e6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -328,7 +328,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
}
@Override
- public MyStateModel createNewStateModel(String partitionName) {
+ public MyStateModel createNewStateModel(String resource, String partitionName) {
return new MyStateModel(helixManager);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index 3571a7b..9510e62 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -90,7 +90,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkIntegrationTestBas
public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
@Override
- public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+ public BootstrapStateModel createNewStateModel(String resource, String stateUnitKey) {
BootstrapStateModel model = new BootstrapStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 456baca..d3aa5ad 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -502,7 +502,7 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
public class PrefListTaskOnlineOfflineStateModelFactory extends
StateModelFactory<PrefListTaskOnlineOfflineStateModel> {
@Override
- public PrefListTaskOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ public PrefListTaskOnlineOfflineStateModel createNewStateModel(String resource, String partitionName) {
return new PrefListTaskOnlineOfflineStateModel();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
new file mode 100644
index 0000000..bbb46eb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
@@ -0,0 +1,126 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * @see HELIX-552
+ * StateModelFactory#_stateModelMap should use both resourceName and partitionKey to map a
+ * state model
+ */
+public class TestResourceWithSamePartitionKey extends ZkUnitTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 2, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "OnlineOffline", RebalanceMode.CUSTOMIZED, false); // do rebalance
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setReplicas("2");
+ idealState.setPartitionState("0", "localhost_12918", "ONLINE");
+ idealState.setPartitionState("0", "localhost_12919", "ONLINE");
+ idealState.setPartitionState("1", "localhost_12918", "ONLINE");
+ idealState.setPartitionState("1", "localhost_12919", "ONLINE");
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // add a second resource with the same partition-key
+ IdealState newIdealState = new IdealState("TestDB1");
+ newIdealState.getRecord().setSimpleFields(idealState.getRecord().getSimpleFields());
+ newIdealState.setPartitionState("0", "localhost_12918", "ONLINE");
+ newIdealState.setPartitionState("0", "localhost_12919", "ONLINE");
+ newIdealState.setPartitionState("1", "localhost_12918", "ONLINE");
+ newIdealState.setPartitionState("1", "localhost_12919", "ONLINE");
+ accessor.setProperty(keyBuilder.idealStates("TestDB1"), newIdealState);
+
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // assert no ERROR
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ List<String> errs = accessor.getChildNames(keyBuilder.errors(instanceName));
+ Assert.assertTrue(errs.isEmpty());
+ }
+
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index a297752..443d484 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -161,7 +161,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
@Override
- public TimeOutStateModel createNewStateModel(String stateUnitKey) {
+ public TimeOutStateModel createNewStateModel(String resource, String stateUnitKey) {
return new TimeOutStateModel(new SleepTransition(_sleepTime),
partitionsToSleep.contains(stateUnitKey));
}
@@ -170,7 +170,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
@Test
public void testStateTransitionTimeOut() throws Exception {
Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
- // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
for (int i = 0; i < NODE_NR; i++) {
@@ -204,7 +203,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
String idealMaster = idealState.getPreferenceList(p).get(0);
Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
- TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
+ TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
Assert.assertEquals(model._errorCallcount, 1);
Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index cc6c0b5..9da5bb1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -83,7 +83,7 @@ public class TestZkReconnect {
participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
new StateModelFactory<StateModel>() {
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public StateModel createNewStateModel(String resource, String stateUnitKey) {
return new SimpleStateModel(latch);
}
}, "test");
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
index 2aff5b6..c24eeef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -108,8 +108,8 @@ public class TestStateModelLeak extends ZkUnitTestBase {
Assert.assertTrue(result);
// check state models have been dropped also
- Assert.assertTrue(fty.getPartitionSet().isEmpty(),
- "All state-models should be dropped, but was " + fty.getPartitionSet());
+ Assert.assertTrue(fty.getPartitionSet("TestDB0").isEmpty(),
+ "All state-models should be dropped, but was " + fty.getPartitionSet("TestDB0"));
// cleanup
controller.syncStop();
@@ -193,8 +193,8 @@ public class TestStateModelLeak extends ZkUnitTestBase {
Assert.assertTrue(result);
// check state models have been dropped also
- Assert.assertTrue(fty.getPartitionSet().isEmpty(),
- "All state-models should be dropped, but was " + fty.getPartitionSet());
+ Assert.assertTrue(fty.getPartitionSet("TestDB0").isEmpty(),
+ "All state-models should be dropped, but was " + fty.getPartitionSet("TestDB0"));
// cleanup
controller.syncStop();
@@ -212,9 +212,9 @@ public class TestStateModelLeak extends ZkUnitTestBase {
*/
static void checkStateModelMap(StateModelFactory<? extends StateModel> fty,
Map<String, String> expectStateModelMap) {
- Assert.assertEquals(fty.getPartitionSet().size(), expectStateModelMap.size());
- for (String partition : fty.getPartitionSet()) {
- StateModel stateModel = fty.getStateModel(partition);
+ Assert.assertEquals(fty.getPartitionSet("TestDB0").size(), expectStateModelMap.size());
+ for (String partition : fty.getPartitionSet("TestDB0")) {
+ StateModel stateModel = fty.getStateModel("TestDB0", partition);
String actualState = stateModel.getCurrentState();
String expectState = expectStateModelMap.get(partition);
LOG.debug(partition + " actual state: " + actualState + ", expect state: " + expectState);
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index 2111a65..085f822 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -115,7 +115,7 @@ public class DummyProcess {
}
@Override
- public DummyStateModel createNewStateModel(String stateUnitKey) {
+ public DummyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
DummyStateModel model = new DummyStateModel();
model.setDelay(_delay);
return model;
@@ -131,7 +131,7 @@ public class DummyProcess {
}
@Override
- public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey) {
+ public DummyLeaderStandbyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
model.setDelay(_delay);
return model;
@@ -147,7 +147,7 @@ public class DummyProcess {
}
@Override
- public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey) {
+ public DummyOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) {
DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
model.setDelay(_delay);
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
index 177e7c4..b7a80c6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
// mock Bootstrap state model factory
public class MockBootstrapModelFactory extends StateModelFactory<MockBootstrapStateModel> {
@Override
- public MockBootstrapStateModel createNewStateModel(String partitionKey) {
+ public MockBootstrapStateModel createNewStateModel(String resourceName, String partitionKey) {
MockBootstrapStateModel model = new MockBootstrapStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index 9325934..1849fc2 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -37,14 +37,16 @@ public class MockMSModelFactory extends StateModelFactory<MockMSStateModel> {
_transition = transition;
// set existing transition
- for (String partition : getPartitionSet()) {
- MockMSStateModel stateModel = getStateModel(partition);
- stateModel.setTransition(transition);
+ for (String resource : getResourceSet()) {
+ for (String partition : getPartitionSet(resource)) {
+ MockMSStateModel stateModel = getStateModel(resource, partition);
+ stateModel.setTransition(transition);
+ }
}
}
@Override
- public MockMSStateModel createNewStateModel(String partitionKey) {
+ public MockMSStateModel createNewStateModel(String resourceName, String partitionKey) {
MockMSStateModel model = new MockMSStateModel(_transition);
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
index 525e764..cbeebe2 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
// mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel> {
@Override
- public MockSchemataStateModel createNewStateModel(String partitionKey) {
+ public MockSchemataStateModel createNewStateModel(String resourceName, String partitionKey) {
MockSchemataStateModel model = new MockSchemataStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
index dae58b3..509ac5b 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
@@ -23,18 +23,15 @@ import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.participant.DistClusterControllerStateModel;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.testng.annotations.Test;
-import org.testng.annotations.Test;
public class TestDistControllerStateModelFactory {
final String zkAddr = ZkUnitTestBase.ZK_ADDR;
- @Test(groups = {
- "unitTest"
- })
+ @Test()
public void testDistControllerStateModelFactory() {
DistClusterControllerStateModelFactory factory =
new DistClusterControllerStateModelFactory(zkAddr);
- DistClusterControllerStateModel stateModel = factory.createNewStateModel("key");
+ DistClusterControllerStateModel stateModel = factory.createNewStateModel("name", "key");
stateModel.onBecomeStandbyFromOffline(null, null);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
index cede270..3001836 100644
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
@@ -23,7 +23,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
public class LockFactory extends StateModelFactory<Lock> {
@Override
- public Lock createNewStateModel(String lockName) {
+ public Lock createNewStateModel(String resourceName, String lockName) {
return new Lock(lockName);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
index c59e9c4..fffb95f 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
@@ -31,7 +31,7 @@ public class ConsumerStateModelFactory extends StateModelFactory<ConsumerStateMo
}
@Override
- public ConsumerStateModel createNewStateModel(String partition) {
+ public ConsumerStateModel createNewStateModel(String resource, String partition) {
ConsumerStateModel model = new ConsumerStateModel(_consumerId, partition, _mqServer);
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
index 4df8e3d..089a402 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@ -30,7 +30,7 @@ public class FileStoreStateModelFactory extends StateModelFactory<FileStoreState
}
@Override
- public FileStoreStateModel createNewStateModel(String partition) {
+ public FileStoreStateModel createNewStateModel(String resource, String partition) {
FileStoreStateModel model;
model = new FileStoreStateModel(manager, partition.split("_")[0], partition);
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
index 0864ced..6fc825c 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
@@ -34,7 +34,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
}
@Override
- public TaskStateModel createNewStateModel(String partition) {
+ public TaskStateModel createNewStateModel(String resource, String partition) {
TaskStateModel model = new TaskStateModel(_workerId, partition, _taskFactory, _taskResultStore);
return model;
}