You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/11/19 20:31:21 UTC

helix git commit: [HELIX-552] StateModelFactory#_stateModelMap should use both resourceName and partitionKey to map a state model, rb=28211

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 45a9fd3a9 -> 9ddd0af34


[HELIX-552] StateModelFactory#_stateModelMap should use both resourceName and partitionKey to map a state model, rb=28211


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9ddd0af3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9ddd0af3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9ddd0af3

Branch: refs/heads/helix-0.6.x
Commit: 9ddd0af347037f585a52b83bbc3a6b11a3934c82
Parents: 45a9fd3
Author: zzhang <zz...@apache.org>
Authored: Wed Nov 19 11:31:01 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Wed Nov 19 11:31:01 2014 -0800

----------------------------------------------------------------------
 .../helix/agent/AgentStateModelFactory.java     |   2 +-
 .../main/java/org/apache/helix/PropertyKey.java |   9 ++
 .../apache/helix/examples/BootstrapHandler.java |   2 +-
 .../apache/helix/examples/DummyParticipant.java |   2 +-
 .../LeaderStandbyStateModelFactory.java         |   2 +-
 .../examples/MasterSlaveStateModelFactory.java  |   2 +-
 .../OnlineOfflineStateModelFactory.java         |   2 +-
 .../handling/HelixStateTransitionHandler.java   |   2 +-
 .../DistClusterControllerStateModelFactory.java |   2 +-
 .../GenericLeaderStandbyStateModelFactory.java  |   2 +-
 .../participant/HelixStateMachineEngine.java    |  20 +--
 .../statemachine/ScheduledTaskStateModel.java   |  33 ++---
 .../ScheduledTaskStateModelFactory.java         |   6 +-
 .../statemachine/StateModelFactory.java         |  57 ++++++---
 .../helix/task/TaskStateModelFactory.java       |   2 +-
 .../org/apache/helix/TestHelixTaskExecutor.java |   2 +-
 .../org/apache/helix/TestHelixTaskHandler.java  |   2 +-
 .../TestCorrectnessOnConnectivityLoss.java      |   2 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   2 +-
 .../helix/integration/TestMessageThrottle2.java |   2 +-
 .../TestPartitionLevelTransitionConstraint.java |   2 +-
 .../integration/TestPreferenceListAsQueue.java  |   2 +-
 .../TestResourceWithSamePartitionKey.java       | 126 +++++++++++++++++++
 .../integration/TestStateTransitionTimeout.java |   5 +-
 .../helix/integration/TestZkReconnect.java      |   2 +-
 .../integration/manager/TestStateModelLeak.java |  14 +--
 .../helix/mock/participant/DummyProcess.java    |   6 +-
 .../participant/MockBootstrapModelFactory.java  |   2 +-
 .../mock/participant/MockMSModelFactory.java    |  10 +-
 .../participant/MockSchemataModelFactory.java   |   2 +-
 .../TestDistControllerStateModelFactory.java    |   7 +-
 .../apache/helix/lockmanager/LockFactory.java   |   2 +-
 .../rabbitmq/ConsumerStateModelFactory.java     |   2 +-
 .../filestore/FileStoreStateModelFactory.java   |   2 +-
 .../taskexecution/TaskStateModelFactory.java    |   2 +-
 35 files changed, 250 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index a0e00a3..f13dd05 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 public class AgentStateModelFactory extends StateModelFactory<AgentStateModel> {
 
   @Override
-  public AgentStateModel createNewStateModel(String partitionKey) {
+  public AgentStateModel createNewStateModel(String resourceName, String partitionKey) {
     AgentStateModel model = new AgentStateModel();
     return model;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index a394b50..a5a3561 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -306,6 +306,15 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link Error} for an instance
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey errors(String instanceName) {
+      return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
+    }
+
+    /**
      * Get a property key associated with a specific {@link Message} on an instance
      * @param instanceName
      * @param messageId

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
index f1b37fc..65e68a4 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
@@ -37,7 +37,7 @@ import org.apache.helix.participant.statemachine.Transition;
 public class BootstrapHandler extends StateModelFactory<StateModel> {
 
   @Override
-  public StateModel createNewStateModel(String stateUnitKey) {
+  public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
     return new BootstrapStateModel(stateUnitKey);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
index 52a3696..ea13951 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
@@ -87,7 +87,7 @@ public class DummyParticipant {
   // dummy master slave state model factory
   public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel> {
     @Override
-    public DummyMSStateModel createNewStateModel(String partitionName) {
+    public DummyMSStateModel createNewStateModel(String resourceName, String partitionName) {
       DummyMSStateModel model = new DummyMSStateModel();
       return model;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
index 43ac5de..c0fff7c 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
@@ -32,7 +32,7 @@ public class LeaderStandbyStateModelFactory extends StateModelFactory<StateModel
   }
 
   @Override
-  public StateModel createNewStateModel(String stateUnitKey) {
+  public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
     LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
     stateModel.setDelay(_delay);
     return stateModel;

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
index affbea8..85d90b2 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
@@ -44,7 +44,7 @@ public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel>
   }
 
   @Override
-  public StateModel createNewStateModel(String partitionName) {
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
     MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
     stateModel.setInstanceName(_instanceName);
     stateModel.setDelay(_delay);

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
index daf03a9..e11c6d1 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
@@ -32,7 +32,7 @@ public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel
   }
 
   @Override
-  public StateModel createNewStateModel(String stateUnitKey) {
+  public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
     OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
     stateModel.setDelay(_delay);
     return stateModel;

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 8da7ec9..e9c4f48 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -195,7 +195,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
         List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
-        _stateModelFactory.removeStateModel(partitionKey);
+        _stateModelFactory.removeStateModel(resource, partitionKey);
       } else {
         // if the partition is not to be dropped, update _stateModel to the TO_STATE
         _stateModel.updateState(toState);

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
index a367c81..c172168 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -30,7 +30,7 @@ public class DistClusterControllerStateModelFactory extends
   }
 
   @Override
-  public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
+  public DistClusterControllerStateModel createNewStateModel(String resourceName, String partitionKey) {
     return new DistClusterControllerStateModel(_zkAddr);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
index 51c91cc..f679cd9 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -39,7 +39,7 @@ public class GenericLeaderStandbyStateModelFactory extends
   }
 
   @Override
-  public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
+  public GenericLeaderStandbyModel createNewStateModel(String resourceName, String partitionKey) {
     return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 31fcecf..039d076 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -142,13 +142,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
         .values()) {
       for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
-        for (String resourceKey : stateModelFactory.getPartitionSet()) {
-          StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
-          stateModel.reset();
-          String initialState = _stateModelParser.getInitialState(stateModel.getClass());
-          stateModel.updateState(initialState);
-          // TODO probably should update the state on ZK. Shi confirm what needs
-          // to be done here.
+        for (String resourceName : stateModelFactory.getResourceSet()) {
+          for (String partitionKey : stateModelFactory.getPartitionSet(resourceName)) {
+            StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
+            stateModel.reset();
+            String initialState = _stateModelParser.getInitialState(stateModel.getClass());
+            stateModel.updateState(initialState);
+            // TODO probably should update the state on ZK. Shi confirm what needs
+            // to be done here.
+          }
         }
       }
     }
@@ -206,9 +208,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     if (message.getBatchMessageMode() == false) {
       // create currentStateDelta for this partition
       String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
-      StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
+      StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
       if (stateModel == null) {
-        stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
+        stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
         stateModel.updateState(initState);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index 8b6a02c..6c7013c 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -37,26 +37,28 @@ public class ScheduledTaskStateModel extends StateModel {
   // StateModel with initial state other than OFFLINE should override this field
   protected String _currentState = DEFAULT_INITIAL_STATE;
   final ScheduledTaskStateModelFactory _factory;
-  final String _partitionName;
+  final String _resourceName;
+  final String _partitionKey;
 
   final HelixTaskExecutor _executor;
 
   public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
-      HelixTaskExecutor executor, String partitionName) {
+      HelixTaskExecutor executor, String resourceName, String partitionKey) {
     _factory = factory;
-    _partitionName = partitionName;
+    _resourceName = resourceName;
+    _partitionKey = partitionKey;
     _executor = executor;
   }
 
   @Transition(to = "COMPLETED", from = "OFFLINE")
   public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeCompletedFromOffline");
+    logger.info(_partitionKey + " onBecomeCompletedFromOffline");
 
     // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
     Map<String, String> messageInfo =
         message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
-    ZNRecord record = new ZNRecord(_partitionName);
+    ZNRecord record = new ZNRecord(_partitionKey);
     record.getSimpleFields().putAll(messageInfo);
     Message taskMessage = new Message(record);
     if (logger.isDebugEnabled()) {
@@ -66,49 +68,50 @@ public class ScheduledTaskStateModel extends StateModel {
         _executor.createMessageHandler(taskMessage, new NotificationContext(null));
     if (handler == null) {
       throw new HelixException("Task message " + taskMessage.getMsgType()
-          + " handler not found, task id " + _partitionName);
+          + " handler not found, task id " + _partitionKey);
     }
     // Invoke the internal handler to complete the task
     handler.handleMessage();
-    logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+    logger.info(_partitionKey + " onBecomeCompletedFromOffline completed");
   }
 
   @Transition(to = "OFFLINE", from = "COMPLETED")
   public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
-    logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+    logger.info(_partitionKey + " onBecomeOfflineFromCompleted");
   }
 
   @Transition(to = "DROPPED", from = "COMPLETED")
   public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
-    logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+    logger.info(_partitionKey + " onBecomeDroppedFromCompleted");
     removeFromStatemodelFactory();
   }
 
   @Transition(to = "DROPPED", from = "OFFLINE")
   public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+    logger.info(_partitionKey + " onBecomeDroppedFromScheduled");
     removeFromStatemodelFactory();
   }
 
   @Transition(to = "OFFLINE", from = "ERROR")
   public void onBecomeOfflineFromError(Message message, NotificationContext context)
       throws InterruptedException {
-    logger.info(_partitionName + " onBecomeOfflineFromError");
+    logger.info(_partitionKey + " onBecomeOfflineFromError");
   }
 
   @Override
   public void reset() {
-    logger.info(_partitionName + " ScheduledTask reset");
+    logger.info(_partitionKey + " ScheduledTask reset");
     removeFromStatemodelFactory();
   }
 
   // We need this to prevent state model leak
   private void removeFromStatemodelFactory() {
-    if (_factory.getStateModel(_partitionName) != null) {
-      _factory.removeStateModel(_partitionName);
+    if (_factory.getStateModel(_resourceName, _partitionKey) != null) {
+      _factory.removeStateModel(_resourceName, _partitionKey);
     } else {
-      logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+      logger.warn(_resourceName + "_ " + _partitionKey
+          + " not found in ScheduledTaskStateModelFactory");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
index a205910..dce5898 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -32,8 +32,8 @@ public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledT
   }
 
   @Override
-  public ScheduledTaskStateModel createNewStateModel(String partitionName) {
-    logger.info("Create state model for ScheduledTask " + partitionName);
-    return new ScheduledTaskStateModel(this, _executor, partitionName);
+  public ScheduledTaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    logger.info("Create state model for ScheduledTask " + resourceName + "_" + partitionKey);
+    return new ScheduledTaskStateModel(this, _executor, resourceName, partitionKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index bd1a668..3a6c6d5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant.statemachine;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,9 +29,10 @@ import org.apache.helix.messaging.handling.BatchMessageWrapper;
 
 public abstract class StateModelFactory<T extends StateModel> {
   /**
-   * mapping from partitionName to StateModel
+   * mapping resourceName to map of partitionName to StateModel
    */
-  private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
+  private final ConcurrentMap<String, ConcurrentMap<String, T>> _stateModelMap =
+      new ConcurrentHashMap<String, ConcurrentMap<String, T>>();
 
   /**
    * mapping from resourceName to BatchMessageWrapper
@@ -39,49 +41,68 @@ public abstract class StateModelFactory<T extends StateModel> {
       new ConcurrentHashMap<String, BatchMessageWrapper>();
 
   /**
-   * This method will be invoked only once per partitionName per session
+   * This method will be invoked only once per resource per partition per session
+   * Replacing old StateModelFactory#createNewStateModel(String partitionName)
+   * Add "resourceName" to signature @see HELIX-552
+   * @param resourceName
    * @param partitionName
-   * @return
+   * @return state model
    */
-  public abstract T createNewStateModel(String partitionName);
+  public abstract T createNewStateModel(String resourceName, String partitionName);
 
   /**
    * Create a state model for a partition
-   * @param partitionName
+   * @param partitionKey
    */
-  public T createAndAddStateModel(String partitionName) {
-    T stateModel = createNewStateModel(partitionName);
-    _stateModelMap.put(partitionName, stateModel);
+  public T createAndAddStateModel(String resourceName, String partitionKey) {
+    T stateModel = createNewStateModel(resourceName, partitionKey);
+    _stateModelMap.putIfAbsent(resourceName, new ConcurrentHashMap<String, T>());
+    _stateModelMap.get(resourceName).put(partitionKey, stateModel);
     return stateModel;
   }
 
   /**
    * Get the state model for a partition
-   * @param partitionName
+   * @param resourceName
+   * @param partitionKey
    * @return state model if exists, null otherwise
    */
-  public T getStateModel(String partitionName) {
-    return _stateModelMap.get(partitionName);
+  public T getStateModel(String resourceName, String partitionKey) {
+    Map<String, T> map = _stateModelMap.get(resourceName);
+    return map == null? null : map.get(partitionKey);
   }
 
   /**
    * remove state model for a partition
-   * @param partitionName
+   * @param resourceName
+   * @param partitionKey
    * @return state model removed or null if not exist
    */
-  public T removeStateModel(String partitionName) {
-    return _stateModelMap.remove(partitionName);
+  public T removeStateModel(String resourceName, String partitionKey) {
+    Map<String, T> map = _stateModelMap.get(resourceName);
+    return map == null? null : map.remove(partitionKey);
   }
 
   /**
-   * get partition set
-   * @return partition key set
+   * get resource set
+   * @param resourceName
+   * @return resource name set
    */
-  public Set<String> getPartitionSet() {
+  public Set<String> getResourceSet() {
     return _stateModelMap.keySet();
   }
 
   /**
+   * get partition set for a resource
+   * @param resourceName
+   * @return partition key set
+   */
+  public Set<String> getPartitionSet(String resourceName) {
+    Map<String, T> map = _stateModelMap.get(resourceName);
+    return (map == null? Collections.<String>emptySet() : map.keySet());
+  }
+
+  /**
    * create a default batch-message-wrapper for a resource
    * @param resourceName
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 51e8c95..b8e91f5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -37,7 +37,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   }
 
   @Override
-  public TaskStateModel createNewStateModel(String partitionName) {
+  public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
     return new TaskStateModel(_manager, _taskFactoryRegistry);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index 4aa16eb..c62cefd 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -72,7 +72,7 @@ public class TestHelixTaskExecutor {
     StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {
 
       @Override
-      public MockStateModel createNewStateModel(String partitionName) {
+      public MockStateModel createNewStateModel(String resource, String partitionName) {
         // TODO Auto-generated method stub
         return new MockStateModel();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 3c7f2af..69f24d7 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -116,7 +116,7 @@ public class TestHelixTaskHandler {
         new StateModelFactory<MockStateModelAnnotated>() {
 
           @Override
-          public MockStateModelAnnotated createNewStateModel(String partitionName) {
+          public MockStateModelAnnotated createNewStateModel(String resource, String partitionName) {
             // TODO Auto-generated method stub
             return new MockStateModelAnnotated();
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index 3b44f2c..cbe231d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -196,7 +196,7 @@ public class TestCorrectnessOnConnectivityLoss {
     }
 
     @Override
-    public MyStateModel createNewStateModel(String partitionId) {
+    public MyStateModel createNewStateModel(String resource, String partitionId) {
       return new MyStateModel(_counts);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
index 8a3d727..dc38369 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
@@ -152,7 +152,7 @@ public class TestEntropyFreeNodeBounce extends ZkUnitTestBase {
   private static class MockStateModelFactory extends StateModelFactory<MockStateModel> {
 
     @Override
-    public MockStateModel createNewStateModel(String partitionName) {
+    public MockStateModel createNewStateModel(String resource, String partitionName) {
       return new MockStateModel();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index 7d66780..49222e6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -328,7 +328,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
     }
 
     @Override
-    public MyStateModel createNewStateModel(String partitionName) {
+    public MyStateModel createNewStateModel(String resource, String partitionName) {
       return new MyStateModel(helixManager);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index 3571a7b..9510e62 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -90,7 +90,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkIntegrationTestBas
   public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
 
     @Override
-    public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+    public BootstrapStateModel createNewStateModel(String resource, String stateUnitKey) {
       BootstrapStateModel model = new BootstrapStateModel();
       return model;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 456baca..d3aa5ad 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -502,7 +502,7 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
   public class PrefListTaskOnlineOfflineStateModelFactory extends
       StateModelFactory<PrefListTaskOnlineOfflineStateModel> {
     @Override
-    public PrefListTaskOnlineOfflineStateModel createNewStateModel(String partitionName) {
+    public PrefListTaskOnlineOfflineStateModel createNewStateModel(String resource, String partitionName) {
       return new PrefListTaskOnlineOfflineStateModel();
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
new file mode 100644
index 0000000..bbb46eb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceWithSamePartitionKey.java
@@ -0,0 +1,126 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * @see HELIX-552
+ *      StateModelFactory#_stateModelMap should use both resourceName and partitionKey to map a
+ *      state model
+ */
+public class TestResourceWithSamePartitionKey extends ZkUnitTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "OnlineOffline", RebalanceMode.CUSTOMIZED, false); // do rebalance
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setReplicas("2");
+    idealState.setPartitionState("0", "localhost_12918", "ONLINE");
+    idealState.setPartitionState("0", "localhost_12919", "ONLINE");
+    idealState.setPartitionState("1", "localhost_12918", "ONLINE");
+    idealState.setPartitionState("1", "localhost_12919", "ONLINE");
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // add a second resource with the same partition-key
+    IdealState newIdealState = new IdealState("TestDB1");
+    newIdealState.getRecord().setSimpleFields(idealState.getRecord().getSimpleFields());
+    newIdealState.setPartitionState("0", "localhost_12918", "ONLINE");
+    newIdealState.setPartitionState("0", "localhost_12919", "ONLINE");
+    newIdealState.setPartitionState("1", "localhost_12918", "ONLINE");
+    newIdealState.setPartitionState("1", "localhost_12919", "ONLINE");
+    accessor.setProperty(keyBuilder.idealStates("TestDB1"), newIdealState);
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // assert no ERROR
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      List<String> errs = accessor.getChildNames(keyBuilder.errors(instanceName));
+      Assert.assertTrue(errs.isEmpty());
+    }
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index a297752..443d484 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -161,7 +161,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    public TimeOutStateModel createNewStateModel(String stateUnitKey) {
+    public TimeOutStateModel createNewStateModel(String resource, String stateUnitKey) {
       return new TimeOutStateModel(new SleepTransition(_sleepTime),
           partitionsToSleep.contains(stateUnitKey));
     }
@@ -170,7 +170,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   @Test
   public void testStateTransitionTimeOut() throws Exception {
     Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
-    // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
     for (int i = 0; i < NODE_NR; i++) {
@@ -204,7 +203,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       String idealMaster = idealState.getPreferenceList(p).get(0);
       Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
 
-      TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
+      TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
       Assert.assertEquals(model._errorCallcount, 1);
       Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index cc6c0b5..9da5bb1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -83,7 +83,7 @@ public class TestZkReconnect {
     participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
         new StateModelFactory<StateModel>() {
           @Override
-          public StateModel createNewStateModel(String stateUnitKey) {
+          public StateModel createNewStateModel(String resource, String stateUnitKey) {
             return new SimpleStateModel(latch);
           }
         }, "test");

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
index 2aff5b6..c24eeef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -108,8 +108,8 @@ public class TestStateModelLeak extends ZkUnitTestBase {
     Assert.assertTrue(result);
 
     // check state models have been dropped also
-    Assert.assertTrue(fty.getPartitionSet().isEmpty(),
-        "All state-models should be dropped, but was " + fty.getPartitionSet());
+    Assert.assertTrue(fty.getPartitionSet("TestDB0").isEmpty(),
+        "All state-models should be dropped, but was " + fty.getPartitionSet("TestDB0"));
 
     // cleanup
     controller.syncStop();
@@ -193,8 +193,8 @@ public class TestStateModelLeak extends ZkUnitTestBase {
     Assert.assertTrue(result);
 
     // check state models have been dropped also
-    Assert.assertTrue(fty.getPartitionSet().isEmpty(),
-        "All state-models should be dropped, but was " + fty.getPartitionSet());
+    Assert.assertTrue(fty.getPartitionSet("TestDB0").isEmpty(),
+        "All state-models should be dropped, but was " + fty.getPartitionSet("TestDB0"));
 
     // cleanup
     controller.syncStop();
@@ -212,9 +212,9 @@ public class TestStateModelLeak extends ZkUnitTestBase {
    */
   static void checkStateModelMap(StateModelFactory<? extends StateModel> fty,
       Map<String, String> expectStateModelMap) {
-    Assert.assertEquals(fty.getPartitionSet().size(), expectStateModelMap.size());
-    for (String partition : fty.getPartitionSet()) {
-      StateModel stateModel = fty.getStateModel(partition);
+    Assert.assertEquals(fty.getPartitionSet("TestDB0").size(), expectStateModelMap.size());
+    for (String partition : fty.getPartitionSet("TestDB0")) {
+      StateModel stateModel = fty.getStateModel("TestDB0", partition);
       String actualState = stateModel.getCurrentState();
       String expectState = expectStateModelMap.get(partition);
       LOG.debug(partition + " actual state: " + actualState + ", expect state: " + expectState);

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index 2111a65..085f822 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -115,7 +115,7 @@ public class DummyProcess {
     }
 
     @Override
-    public DummyStateModel createNewStateModel(String stateUnitKey) {
+    public DummyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
       DummyStateModel model = new DummyStateModel();
       model.setDelay(_delay);
       return model;
@@ -131,7 +131,7 @@ public class DummyProcess {
     }
 
     @Override
-    public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey) {
+    public DummyLeaderStandbyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
       DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
       model.setDelay(_delay);
       return model;
@@ -147,7 +147,7 @@ public class DummyProcess {
     }
 
     @Override
-    public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey) {
+    public DummyOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) {
       DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
       model.setDelay(_delay);
       return model;

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
index 177e7c4..b7a80c6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 // mock Bootstrap state model factory
 public class MockBootstrapModelFactory extends StateModelFactory<MockBootstrapStateModel> {
   @Override
-  public MockBootstrapStateModel createNewStateModel(String partitionKey) {
+  public MockBootstrapStateModel createNewStateModel(String resourceName, String partitionKey) {
     MockBootstrapStateModel model = new MockBootstrapStateModel();
     return model;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index 9325934..1849fc2 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -37,14 +37,16 @@ public class MockMSModelFactory extends StateModelFactory<MockMSStateModel> {
     _transition = transition;
 
     // set existing transition
-    for (String partition : getPartitionSet()) {
-      MockMSStateModel stateModel = getStateModel(partition);
-      stateModel.setTransition(transition);
+    for (String resource : getResourceSet()) {
+      for (String partition : getPartitionSet(resource)) {
+        MockMSStateModel stateModel = getStateModel(resource, partition);
+        stateModel.setTransition(transition);
+      }
     }
   }
 
   @Override
-  public MockMSStateModel createNewStateModel(String partitionKey) {
+  public MockMSStateModel createNewStateModel(String resourceName, String partitionKey) {
     MockMSStateModel model = new MockMSStateModel(_transition);
 
     return model;

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
index 525e764..cbeebe2 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
@@ -24,7 +24,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 // mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
 public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel> {
   @Override
-  public MockSchemataStateModel createNewStateModel(String partitionKey) {
+  public MockSchemataStateModel createNewStateModel(String resourceName, String partitionKey) {
     MockSchemataStateModel model = new MockSchemataStateModel();
     return model;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
index dae58b3..509ac5b 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
@@ -23,18 +23,15 @@ import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.participant.DistClusterControllerStateModel;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.testng.annotations.Test;
-import org.testng.annotations.Test;
 
 public class TestDistControllerStateModelFactory {
   final String zkAddr = ZkUnitTestBase.ZK_ADDR;
 
-  @Test(groups = {
-    "unitTest"
-  })
+  @Test()
   public void testDistControllerStateModelFactory() {
     DistClusterControllerStateModelFactory factory =
         new DistClusterControllerStateModelFactory(zkAddr);
-    DistClusterControllerStateModel stateModel = factory.createNewStateModel("key");
+    DistClusterControllerStateModel stateModel = factory.createNewStateModel("name", "key");
     stateModel.onBecomeStandbyFromOffline(null, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
index cede270..3001836 100644
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
@@ -23,7 +23,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 
 public class LockFactory extends StateModelFactory<Lock> {
   @Override
-  public Lock createNewStateModel(String lockName) {
+  public Lock createNewStateModel(String resourceName, String lockName) {
     return new Lock(lockName);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
index c59e9c4..fffb95f 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
@@ -31,7 +31,7 @@ public class ConsumerStateModelFactory extends StateModelFactory<ConsumerStateMo
   }
 
   @Override
-  public ConsumerStateModel createNewStateModel(String partition) {
+  public ConsumerStateModel createNewStateModel(String resource, String partition) {
     ConsumerStateModel model = new ConsumerStateModel(_consumerId, partition, _mqServer);
     return model;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
index 4df8e3d..089a402 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@ -30,7 +30,7 @@ public class FileStoreStateModelFactory extends StateModelFactory<FileStoreState
   }
 
   @Override
-  public FileStoreStateModel createNewStateModel(String partition) {
+  public FileStoreStateModel createNewStateModel(String resource, String partition) {
     FileStoreStateModel model;
     model = new FileStoreStateModel(manager, partition.split("_")[0], partition);
     return model;

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddd0af3/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
index 0864ced..6fc825c 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
@@ -34,7 +34,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   }
 
   @Override
-  public TaskStateModel createNewStateModel(String partition) {
+  public TaskStateModel createNewStateModel(String resource, String partition) {
     TaskStateModel model = new TaskStateModel(_workerId, partition, _taskFactory, _taskResultStore);
     return model;
   }