You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/14 01:32:21 UTC
[1/3] git commit: [HELIX-242] Re-integrate the scheduler rebalancing
into the new controller pipeline
Updated Branches:
refs/heads/helix-logical-model d8ef5a2d7 -> 0a8baa12f
[HELIX-242] Re-integrate the scheduler rebalancing into the new controller pipeline
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/4e5e3e1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/4e5e3e1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/4e5e3e1d
Branch: refs/heads/helix-logical-model
Commit: 4e5e3e1d18f8f3180acab64b17e8a3cc566639a2
Parents: d8ef5a2
Author: zzhang <zz...@apache.org>
Authored: Fri Sep 13 16:21:44 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Sep 13 16:21:44 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/api/Resource.java | 65 ++-
.../apache/helix/api/SchedulerTaskConfig.java | 61 ++-
.../org/apache/helix/api/StateModelDefId.java | 7 +
.../stages/NewExternalViewComputeStage.java | 77 ++--
.../stages/NewMessageGenerationStage.java | 23 +-
.../java/org/apache/helix/model/Message.java | 60 ++-
.../statemachine/ScheduledTaskStateModel.java | 1 +
.../helix/integration/TestSchedulerMessage.java | 409 ++++---------------
.../integration/TestSchedulerMsgContraints.java | 253 ++++++++++++
.../integration/TestSchedulerMsgUsingQueue.java | 181 ++++++++
10 files changed, 697 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a33f0d6..2ee7a4f 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -47,9 +47,7 @@ public class Resource {
ExternalView externalView, UserConfig userConfig,
Map<PartitionId, UserConfig> partitionUserConfigs, int liveParticipantCount) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
- Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
new HashMap<PartitionId, Map<String, String>>();
- Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
Set<PartitionId> partitionSet = idealState.getPartitionSet();
if (partitionSet.isEmpty() && idealState.getNumPartitions() > 0) {
partitionSet = new HashSet<PartitionId>();
@@ -57,6 +55,7 @@ public class Resource {
partitionSet.add(Id.partition(id, Integer.toString(i)));
}
}
+
for (PartitionId partitionId : partitionSet) {
UserConfig partitionUserConfig = partitionUserConfigs.get(partitionId);
if (partitionUserConfig == null) {
@@ -64,27 +63,10 @@ public class Resource {
}
partitionMap.put(partitionId, new Partition(partitionId, partitionUserConfig));
- // TODO refactor it
- Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
- if (taskConfigMap != null) {
- schedulerTaskConfigMap.put(partitionId, taskConfigMap);
- }
-
- // TODO refactor it
- for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
- if (simpleKey.indexOf("_" + Message.Attributes.TIMEOUT) != -1) {
- try {
- String timeoutStr = idealState.getRecord().getSimpleField(simpleKey);
- int timeout = Integer.parseInt(timeoutStr);
- transitionTimeoutMap.put(simpleKey, timeout);
- } catch (Exception e) {
- // ignore
- }
- }
- }
}
- SchedulerTaskConfig schedulerTaskConfig =
- new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+
+ SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
+
RebalancerConfig rebalancerConfig =
new RebalancerConfig(partitionMap, idealState, resourceAssignment, liveParticipantCount);
@@ -95,6 +77,45 @@ public class Resource {
}
/**
+ * Extract scheduler-task config from ideal-state if state-model-def is SchedulerTaskQueue
+ * @param idealState
+ * @return scheduler-task config or null if state-model-def is not SchedulerTaskQueue
+ */
+ SchedulerTaskConfig schedulerTaskConfig(IdealState idealState) {
+ if (!idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue)) {
+ return null;
+ }
+
+ // TODO refactor get timeout
+ Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
+ for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
+ if (simpleKey.indexOf(Message.Attributes.TIMEOUT.name()) != -1) {
+ try {
+ String timeoutStr = idealState.getRecord().getSimpleField(simpleKey);
+ int timeout = Integer.parseInt(timeoutStr);
+ transitionTimeoutMap.put(simpleKey, timeout);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ Map<PartitionId, Message> innerMsgMap = new HashMap<PartitionId, Message>();
+ for (PartitionId partitionId : idealState.getPartitionSet()) {
+ // TODO refactor: scheduler-task-queue state model uses map-field to store inner-messages
+ // this is different from all other state-models
+ Map<String, String> innerMsgStrMap =
+ idealState.getRecord().getMapField(partitionId.stringify());
+ if (innerMsgStrMap != null) {
+ Message innerMsg = Message.toMessage(innerMsgStrMap);
+ innerMsgMap.put(partitionId, innerMsg);
+ }
+ }
+
+ return new SchedulerTaskConfig(transitionTimeoutMap, innerMsgMap);
+ }
+
+ /**
* Get the partitions of the resource
* @return map of partition id to partition or empty map if none
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
index ac7cb3a..e7d0779 100644
--- a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
@@ -1,6 +1,7 @@
package org.apache.helix.api;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.model.Message;
@@ -10,38 +11,58 @@ public class SchedulerTaskConfig {
// TODO refactor using Transition logical model
private final Map<String, Integer> _transitionTimeoutMap;
- // TODO refactor this when understand inner message format
- private final Map<PartitionId, Map<String, String>> _schedulerTaskConfig;
+ private final Map<PartitionId, Message> _innerMessageMap;
public SchedulerTaskConfig(Map<String, Integer> transitionTimeoutMap,
- Map<PartitionId, Map<String, String>> schedulerTaskConfig) {
+ Map<PartitionId, Message> innerMsgMap) {
_transitionTimeoutMap = ImmutableMap.copyOf(transitionTimeoutMap);
- _schedulerTaskConfig = ImmutableMap.copyOf(schedulerTaskConfig);
+ _innerMessageMap = ImmutableMap.copyOf(innerMsgMap);
}
- public Map<String, String> getTaskConfig(PartitionId partitionId) {
- return _schedulerTaskConfig.get(partitionId);
+ /**
+ * Get inner message for a partition
+ * @param partitionId
+ * @return inner message
+ */
+ public Message getInnerMessage(PartitionId partitionId) {
+ return _innerMessageMap.get(partitionId);
}
- public Integer getTransitionTimeout(String transition) {
- return _transitionTimeoutMap.get(transition);
+ /**
+ * Get timeout for a transition
+ * @param transition
+ * @return timeout or -1 if not available
+ */
+ public int getTransitionTimeout(String transition) {
+ Integer timeout = _transitionTimeoutMap.get(transition);
+ if (timeout == null) {
+ return -1;
+ }
+
+ return timeout;
}
- public Integer getTimeout(String transition, PartitionId partitionId) {
+ /**
+ * Get timeout for an inner message
+ * @param transition
+ * @param partitionId
+ * @return timeout or -1 if not available
+ */
+ public int getTimeout(String transition, PartitionId partitionId) {
Integer timeout = getTransitionTimeout(transition);
if (timeout == null) {
- Map<String, String> taskConfig = getTaskConfig(partitionId);
- if (taskConfig != null) {
- String timeoutStr = taskConfig.get(Message.Attributes.TIMEOUT.toString());
- if (timeoutStr != null) {
- try {
- timeout = Integer.parseInt(timeoutStr);
- } catch (Exception e) {
- // ignore
- }
- }
- }
+ Message innerMessage = getInnerMessage(partitionId);
+ timeout = innerMessage.getTimeout();
}
+
return timeout;
}
+
+ /**
+ * Get partition-id set
+ * @return partition-id set
+ */
+ public Set<PartitionId> getPartitionSet() {
+ return _innerMessageMap.keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
index 0ac4cb9..f77cec8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
@@ -1,5 +1,7 @@
package org.apache.helix.api;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,6 +22,8 @@ package org.apache.helix.api;
*/
public class StateModelDefId extends Id {
+ public static final StateModelDefId SchedulerTaskQueue = Id
+ .stateModelDef(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
private final String _id;
public StateModelDefId(String id) {
@@ -31,4 +35,7 @@ public class StateModelDefId extends Id {
return _id;
}
+ public boolean equalsIgnoreCase(StateModelDefId that) {
+ return _id.equalsIgnoreCase(that._id);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 4a037f3..6361dcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -41,7 +41,9 @@ import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SchedulerTaskConfig;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -54,12 +56,12 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
public class NewExternalViewComputeStage extends AbstractBaseStage {
- private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+ private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- log.info("START ExternalViewComputeStage.process()");
+ LOG.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
Map<ResourceId, ResourceConfig> resourceMap =
@@ -80,6 +82,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
List<ExternalView> newExtViews = new ArrayList<ExternalView>();
List<PropertyKey> keys = new ArrayList<PropertyKey>();
+ // TODO use external-view accessor
Map<String, ExternalView> curExtViews =
dataAccessor.getChildValuesMap(keyBuilder.externalViews());
@@ -90,6 +93,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// otherwise resource has been dropped, use bucket size from current state instead
ResourceConfig resource = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
+
if (resource.getBucketSize() > 0) {
view.setBucketSize(resource.getBucketSize());
} else {
@@ -137,10 +142,9 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// scheduler
// message, and then remove the partitions from the ideal state
if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId().stringify()
- .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // TODO fix it
- // updateScheduledTaskStatus(view, manager, idealState);
+ && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+ StateModelDefId.SchedulerTaskQueue)) {
+ updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
}
}
}
@@ -160,13 +164,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
}
long endTime = System.currentTimeMillis();
- log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+ LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
}
// TODO fix it
- private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
- IdealState taskQueueIdealState) {
+ private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
+ HelixManager manager,
+ SchedulerTaskConfig schedulerTaskConfig) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
// Place holder for finished partitions
@@ -177,23 +184,21 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
Map<String, Map<String, String>> controllerMsgUpdates =
new HashMap<String, Map<String, String>>();
- Builder keyBuilder = accessor.keyBuilder();
-
for (String taskPartitionName : ev.getPartitionStringSet()) {
for (String taskState : ev.getStateMap(taskPartitionName).values()) {
if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
|| taskState.equalsIgnoreCase("COMPLETED")) {
- log.info(taskPartitionName + " finished as " + taskState);
- finishedTasks.getListFields().put(taskPartitionName, emptyList);
- finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+ LOG.info(taskPartitionName + " finished as " + taskState);
+ finishedTasks.setListField(taskPartitionName, emptyList);
+ finishedTasks.setMapField(taskPartitionName, emptyMap);
// Update original scheduler message status update
- if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) {
- String controllerMsgId =
- taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ Message innerMessage =
+ schedulerTaskConfig.getInnerMessage(Id.partition(taskPartitionName));
+ if (innerMessage != null) {
+ String controllerMsgId = innerMessage.getControllerMessagId();
if (controllerMsgId != null) {
- log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+ LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
}
@@ -204,16 +209,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
}
}
// fill the controllerMsgIdCountMap
- for (String taskId : taskQueueIdealState.getPartitionStringSet()) {
- String controllerMsgId =
- taskQueueIdealState.getRecord().getMapField(taskId)
- .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
+ Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
+ String controllerMsgId = innerMessage.getControllerMessagId();
+
if (controllerMsgId != null) {
- if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) {
- controllerMsgIdCountMap.put(controllerMsgId, 0);
+ Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
+ if (curCnt == null) {
+ curCnt = 0;
}
- controllerMsgIdCountMap.put(controllerMsgId,
- (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+ controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
}
}
@@ -223,18 +228,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
+ Message innerMessage =
+ schedulerTaskConfig.getInnerMessage(Id.partition(taskPartitionName));
+
Map<String, String> result = new HashMap<String, String>();
result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
controllerStatusUpdate.getRecord().setMapField(
- "MessageResult "
- + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(Message.Attributes.TGT_NAME.toString())
- + " "
- + taskPartitionName
- + " "
- + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(Message.Attributes.MSG_ID.toString()), result);
+ "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
+ + innerMessage.getMsgId(), result);
}
+
// All done for the scheduled tasks that came from controllerMsgId, add summary for it
if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
controllerMsgId).intValue()) {
@@ -266,12 +269,12 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
deltaList.add(znDelta);
- IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+ IdealState delta = new IdealState(resourceId);
delta.setDeltaList(deltaList);
// Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
keyBuilder = accessor.keyBuilder();
- accessor.updateProperty(keyBuilder.idealState(taskQueueIdealState.getResourceName()), delta);
+ accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index a9fc51d..0c9fe9a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -35,6 +35,7 @@ import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SchedulerTaskConfig;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
@@ -139,29 +140,29 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resourceConfig
.getRebalancerConfig().getStateModelFactoryId(), bucketSize);
- // TODO refactor set timeout logic, it's really messy
+ // TODO refactor get/set timeout/inner-message
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId().stringify()
- .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+ StateModelDefId.SchedulerTaskQueue)) {
if (resourceConfig.getPartitionMap().size() > 0) {
// TODO refactor it -- we need a way to read in scheduler tasks a priori
Resource activeResource = cluster.getResource(resourceId);
if (activeResource != null) {
- message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ message.setInnerMessage(activeResource.getSchedulerTaskConfig().getInnerMessage(
+ partitionId));
}
}
}
// Set timeout if needed
String stateTransition =
- currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-
- if (resourceConfig.getSchedulerTaskConfig() != null) {
- Integer timeout =
- resourceConfig.getSchedulerTaskConfig().getTimeout(stateTransition, partitionId);
- if (timeout != null && timeout > 0) {
+ String.format("%s-%s_%s", currentState, nextState,
+ Message.Attributes.TIMEOUT.name());
+ SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
+ if (schedulerTaskConfig != null) {
+ int timeout = schedulerTaskConfig.getTimeout(stateTransition, partitionId);
+ if (timeout > 0) {
message.setExecutionTimeout(timeout);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b2b538d..68672e3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import com.google.common.collect.ImmutableList;
@@ -162,6 +163,22 @@ public class Message extends HelixProperty {
}
/**
+ * Convert a string map to a message
+ * @param msgStrMap
+ * @return message
+ */
+ public static Message toMessage(Map<String, String> msgStrMap) {
+ String msgId = msgStrMap.get(Attributes.MSG_ID.name());
+ if (msgId == null) {
+ throw new IllegalArgumentException("Missing msgId in message string map: " + msgStrMap);
+ }
+
+ ZNRecord record = new ZNRecord(msgId);
+ record.getSimpleFields().putAll(msgStrMap);
+ return new Message(record);
+ }
+
+ /**
* Set the time that the message was created
* @param timestamp a UNIX timestamp
*/
@@ -647,16 +664,6 @@ public class Message extends HelixProperty {
return builder.build();
}
- // public AtomicInteger getGroupMsgCountDown()
- // {
- // return _groupMsgCountDown;
- // }
- //
- // public void setGroupMsgCountDown(AtomicInteger countDown)
- // {
- // _groupMsgCountDown = countDown;
- // }
-
/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
@@ -679,6 +686,39 @@ public class Message extends HelixProperty {
}
}
+ /**
+ * Get timeout
+ * @return timeout or -1 if not available
+ */
+ public int getTimeout() {
+ String timeoutStr = _record.getSimpleField(Attributes.TIMEOUT.name());
+ int timeout = -1;
+ if (timeoutStr != null) {
+ try {
+ timeout = Integer.parseInt(timeoutStr);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ return timeout;
+ }
+
+ /**
+ * Get controller message id, used for scheduler-task-queue state model only
+ * @return controller message id
+ */
+ public String getControllerMessagId() {
+ return _record.getSimpleField(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ }
+
+ /**
+ * Set an inner message
+ * @param inner message
+ */
+ public void setInnerMessage(Message message) {
+ _record.setMapField(Attributes.INNER_MESSAGE.name(), message.getRecord().getSimpleFields());
+ }
+
private boolean isNullOrEmpty(String data) {
return data == null || data.length() == 0 || data.trim().length() == 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index 8b6a02c..ca67d42 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -52,6 +52,7 @@ public class ScheduledTaskStateModel extends StateModel {
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
logger.info(_partitionName + " onBecomeCompletedFromOffline");
+ // System.err.println("\t\t" + _partitionName + " onBecomeCompletedFromOffline");
// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 38a41be..6d66347 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,11 +35,13 @@ import java.util.concurrent.CountDownLatch;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -48,6 +51,7 @@ import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -74,7 +78,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Override
public void onTimeOut() {
// TODO Auto-generated method stub
-
}
@Override
@@ -86,10 +89,17 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
+ int cnt;
public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+ public TestMessagingHandlerFactory() {
+ super();
+ cnt = 0;
+ }
+
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
+ // System.out.println("\t create-hdlr: " + message.getId());
return new TestMessagingHandler(message, context);
}
@@ -114,73 +124,20 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
- String destName = _message.getTgtName();
- result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
- synchronized (_results) {
- if (!_results.containsKey(_message.getPartitionId().stringify())) {
- _results
- .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
- }
- }
- _results.get(_message.getPartitionId().stringify()).add(_message.getMsgId().stringify());
- // System.err.println("Message " + _message.getMsgId() + " executed");
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type) {
- // TODO Auto-generated method stub
- }
- }
- }
-
- public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
- public volatile CountDownLatch _latch = new CountDownLatch(1);
- public int _messageCount = 0;
- public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
- @Override
- public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
- _messageCount++;
- return new TestMessagingHandlerLatch(message, context);
- }
-
- public synchronized void signal() {
- _latch.countDown();
- _latch = new CountDownLatch(1);
- }
-
- @Override
- public String getMessageType() {
- return "TestMessagingHandlerLatch";
- }
-
- @Override
- public void reset() {
- // TODO Auto-generated method stub
- }
-
- public class TestMessagingHandlerLatch extends MessageHandler {
- public TestMessagingHandlerLatch(Message message, NotificationContext context) {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
+ // String tgtName = _message.getTgtName();
+ String messageId = _message.getMsgId().stringify();
+ String partitionId = _message.getPartitionId().stringify();
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException {
- _latch.await();
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
- String destName = _message.getTgtName();
+ result.getTaskResultMap().put("Message", messageId);
synchronized (_results) {
- if (!_results.containsKey(_message.getPartitionId().stringify())) {
- _results
- .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
+ if (!_results.containsKey(partitionId)) {
+ _results.put(partitionId, new HashSet<String>());
}
+ _results.get(partitionId).add(messageId);
}
- _results.get(_message.getPartitionId().stringify()).add(destName);
- // System.err.println("Message " + _message.getMsgId() + " executed");
+ cnt++;
+ // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
+ // + ", partitionId: " + partitionId);
return result;
}
@@ -192,96 +149,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsgUsingQueue() throws Exception {
- Logger.getRootLogger().setLevel(Level.INFO);
- _factory._results.clear();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- _factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(Id.session("*"));
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
- schedulerMessage.getRecord().setSimpleField(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
- // Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), Id.message("Template"));
- msg.setTgtSessionId(Id.session("*"));
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.createProperty(
- keyBuilder.controllerMessage(schedulerMessage.getMsgId().stringify()), schedulerMessage);
-
- for (int i = 0; i < 30; i++) {
- Thread.sleep(2000);
- if (_PARTITIONS == _factory._results.size()) {
- break;
- }
- }
-
- Assert.assertEquals(_PARTITIONS, _factory._results.size());
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
- .getMsgId().stringify());
-
- int messageResultCount = 0;
- for (int i = 0; i < 10; i++) {
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
- .equals("" + (_PARTITIONS * 3)));
- for (String key : statusUpdate.getMapFields().keySet()) {
- if (key.startsWith("MessageResult ")) {
- messageResultCount++;
- }
- }
- if (messageResultCount == _PARTITIONS * 3) {
- break;
- } else {
- Thread.sleep(2000);
- }
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
- int count = 0;
- for (Set<String> val : _factory._results.values()) {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
-
- }
-
- @Test()
- public void TestSchedulerMsg() throws Exception {
- Logger.getRootLogger().setLevel(Level.INFO);
+ public void testSchedulerMsg() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
@@ -419,7 +288,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg2() throws Exception {
+ public void testSchedulerMsg2() throws Exception {
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
@@ -510,7 +379,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerZeroMsg() throws Exception {
+ public void testSchedulerZeroMsg() throws Exception {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
@@ -562,15 +431,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
Thread.sleep(3000);
Assert.assertEquals(0, factory._results.size());
+
+ waitMessageUpdate("SentMessageCount", schedulerMessage.getMsgId().stringify(), helixDataAccessor);
PropertyKey controllerTaskStatus =
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
.getMsgId().stringify());
- for (int i = 0; i < 10; i++) {
- StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
- if (update == null || update.getRecord().getMapField("SentMessageCount") == null) {
- Thread.sleep(1000);
- }
- }
+ waitMessageUpdate("SentMessageCount", schedulerMessage.getMsgId().stringify(),
+ helixDataAccessor);
ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount").equals("0"));
int count = 0;
@@ -581,14 +448,14 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg3() throws Exception {
+ public void testSchedulerMsg3() throws Exception {
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
String hostDest = "localhost_" + (START_PORT + i);
_startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- //
+
_startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
manager = _startCMResultMap.get(hostDest)._manager;
@@ -667,15 +534,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
- for (int j = 0; j < 100; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("Summary")) {
- break;
- }
- }
+ waitMessageUpdate("Summary", msgId, helixDataAccessor);
PropertyKey controllerTaskStatus =
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -702,14 +561,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg4() throws Exception {
+ public void testSchedulerMsg4() throws Exception {
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
String hostDest = "localhost_" + (START_PORT + i);
_startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- //
_startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
manager = _startCMResultMap.get(hostDest)._manager;
@@ -780,8 +638,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
callback._message.getResultMap()
.get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
+ final HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ final Builder keyBuilder = helixDataAccessor.keyBuilder();
ArrayList<String> msgIds = new ArrayList<String>();
for (int i = 0; i < NODE_NR; i++) {
callback = new MockAsyncCallback();
@@ -793,6 +651,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
sw = new StringWriter();
mapper.writeValue(sw, cr);
schedulerMessage.setMsgId(Id.message(UUID.randomUUID().toString()));
+
+ // need to use a different name for scheduler_task_queue task resource
+ schedulerMessage.getRecord().setSimpleField(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4_" + i);
+
crString = sw.toString();
schedulerMessage.getRecord().setSimpleField("Criteria", crString);
manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
@@ -801,18 +664,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
msgIds.add(msgId);
}
+
for (int i = 0; i < NODE_NR; i++) {
- String msgId = msgIds.get(i);
- for (int j = 0; j < 100; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("Summary")) {
- // System.err.println(msgId+" done");
- break;
- }
- }
+ final String msgId = msgIds.get(i);
+
+ waitMessageUpdate("Summary", msgId, helixDataAccessor);
PropertyKey controllerTaskStatus =
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -825,22 +681,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
messageResultCount++;
}
}
- if (messageResultCount != _PARTITIONS * 3 / 5) {
- int x = 10;
- x = x + messageResultCount;
- }
Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
}
- for (int j = 0; j < 100; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("Summary")) {
- break;
- }
- }
+ waitMessageUpdate("Summary", msgIdPrime, helixDataAccessor);
+
int count = 0;
for (Set<String> val : _factory._results.values()) {
// System.out.println(val);
@@ -850,147 +695,31 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
Assert.assertEquals(count, _PARTITIONS * 3 * 2);
}
- @Test
- public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
- IOException, InterruptedException {
- TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
- //
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(Id.session("*"));
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
-
- // Template for the individual message sent to each participant
- Message msg = new Message(factory.getMessageType(), Id.message("Template"));
- msg.setTgtSessionId(Id.session("*"));
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
- schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
- schedulerMessage.getRecord().setSimpleField(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
-
- Criteria cr2 = new Criteria();
- cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
- cr2.setInstanceName("*");
- cr2.setSessionSpecific(false);
-
- MockAsyncCallback callback = new MockAsyncCallback();
- mapper = new ObjectMapper();
- serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
-
- // Set contraints that only 1 msg per participant
- Map<String, String> constraints = new TreeMap<String, String>();
- constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
- constraints.put("TRANSITION", "OFFLINE-COMPLETED");
- constraints.put("CONSTRAINT_VALUE", "1");
- constraints.put("INSTANCE", ".*");
- manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
- ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
-
- // Send scheduler message
- crString = sw.toString();
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
- String msgId =
- callback._message.getResultMap()
- .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
- for (int j = 0; j < 10; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
- Assert.assertEquals(
- statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
- + (_PARTITIONS * 3));
- break;
- }
- }
+ /**
+ * wait message summary to appear in controller-message-status-update
+ * @param msgId
+ * @param accessor
+ * @return
+ * @throws Exception
+ */
+ private boolean waitMessageUpdate(final String mapKey, final String msgId,
+ final HelixDataAccessor accessor) throws Exception {
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return TestHelper.verify(new TestHelper.Verifier() {
- for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
- for (int j = 0; j < 10; j++) {
- Thread.sleep(300);
- if (factory._messageCount == 5 * (i + 1))
- break;
- }
- Thread.sleep(300);
- Assert.assertEquals(factory._messageCount, 5 * (i + 1));
- factory.signal();
- // System.err.println(i);
- }
-
- for (int j = 0; j < 10; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("Summary")) {
- break;
- }
- }
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey key = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), msgId);
+ HelixProperty statusUpdate = accessor.getProperty(key);
+ if (statusUpdate == null) {
+ return false;
+ }
- Assert.assertEquals(_PARTITIONS, factory._results.size());
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
- .equals("" + (_PARTITIONS * 3)));
- int messageResultCount = 0;
- for (String key : statusUpdate.getMapFields().keySet()) {
- if (key.startsWith("MessageResult ")) {
- messageResultCount++;
+ if (statusUpdate.getRecord().getMapField(mapKey) == null) {
+ return false;
+ }
+ return true;
}
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
- int count = 0;
- for (Set<String> val : factory._results.values()) {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
-
- manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
- ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-
+ }, 20 * 1000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
new file mode 100644
index 0000000..b5c8d91
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
@@ -0,0 +1,253 @@
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+
+ class MockAsyncCallback extends AsyncCallback {
+ Message _message;
+
+ public MockAsyncCallback() {
+ }
+
+ @Override
+ public void onTimeOut() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onReplyMessage(Message message) {
+ _message = message;
+ }
+ }
+
+ public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
+ public volatile CountDownLatch _latch = new CountDownLatch(1);
+ public int _messageCount = 0;
+ public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+ @Override
+ public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
+ _messageCount++;
+ return new TestMessagingHandlerLatch(message, context);
+ }
+
+ public synchronized void signal() {
+ _latch.countDown();
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public String getMessageType() {
+ return "TestMessagingHandlerLatch";
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+ }
+
+ public class TestMessagingHandlerLatch extends MessageHandler {
+ public TestMessagingHandlerLatch(Message message, NotificationContext context) {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ _latch.await();
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
+ String destName = _message.getTgtName();
+ synchronized (_results) {
+ if (!_results.containsKey(_message.getPartitionId().stringify())) {
+ _results
+ .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
+ }
+ }
+ _results.get(_message.getPartitionId().stringify()).add(destName);
+ // System.err.println("Message " + _message.getMsgId() + " executed");
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ // TODO Auto-generated method stub
+ }
+ }
+ }
+
+ @Test
+ public void testSchedulerMsgContraints() throws Exception {
+ TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++) {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ factory.getMessageType(), factory);
+ //
+ _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ factory.getMessageType(), factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage =
+ new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
+ schedulerMessage.setTgtSessionId(Id.session("*"));
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(factory.getMessageType(), Id.message("Template"));
+ msg.setTgtSessionId(Id.session("*"));
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+ schedulerMessage.getRecord().setSimpleField(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+ MockAsyncCallback callback = new MockAsyncCallback();
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ // Set contraints that only 1 msg per participant
+ Map<String, String> constraints = new TreeMap<String, String>();
+ constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+ constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+ constraints.put("CONSTRAINT_VALUE", "1");
+ constraints.put("INSTANCE", ".*");
+ manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+ ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
+
+ // Send scheduler message
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId =
+ callback._message.getResultMap()
+ .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
+ Assert.assertEquals(
+ statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
+ + (_PARTITIONS * 3));
+ break;
+ }
+ }
+
+ for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(300);
+ if (factory._messageCount == 5 * (i + 1))
+ break;
+ }
+ Thread.sleep(300);
+ Assert.assertEquals(factory._messageCount, 5 * (i + 1));
+ factory.signal();
+ // System.err.println(i);
+ }
+
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("Summary")) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, factory._results.size());
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+ .equals("" + (_PARTITIONS * 3)));
+ int messageResultCount = 0;
+ for (String key : statusUpdate.getMapFields().keySet()) {
+ if (key.startsWith("MessageResult ")) {
+ messageResultCount++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+ int count = 0;
+ for (Set<String> val : factory._results.values()) {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
+ ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
new file mode 100644
index 0000000..6175ce0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
@@ -0,0 +1,181 @@
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
+ int cnt;
+ public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+ public TestMessagingHandlerFactory() {
+ super();
+ cnt = 0;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
+ // System.out.println("\t create-hdlr: " + message.getId());
+ return new TestMessagingHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType() {
+ return "TestParticipant";
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public class TestMessagingHandler extends MessageHandler {
+ public TestMessagingHandler(Message message, NotificationContext context) {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ // String tgtName = _message.getTgtName();
+ String messageId = _message.getMsgId().stringify();
+ String partitionId = _message.getPartitionId().stringify();
+
+ result.getTaskResultMap().put("Message", messageId);
+ synchronized (_results) {
+ if (!_results.containsKey(partitionId)) {
+ _results.put(partitionId, new HashSet<String>());
+ }
+ _results.get(partitionId).add(messageId);
+ }
+ cnt++;
+ // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
+ // + ", partitionId: " + partitionId);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ // TODO Auto-generated method stub
+ }
+ }
+ }
+
+ @Test()
+ public void testSchedulerMsgUsingQueue() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ // _factory._results.clear();
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++) {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ factory.getMessageType(), factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage =
+ new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
+ schedulerMessage.setTgtSessionId(Id.session("*"));
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+ schedulerMessage.getRecord().setSimpleField(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgUsingQueue");
+ // Template for the individual message sent to each participant
+ Message msg = new Message(factory.getMessageType(), Id.message("Template"));
+ msg.setTgtSessionId(Id.session("*"));
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor.createProperty(
+ keyBuilder.controllerMessage(schedulerMessage.getMsgId().stringify()), schedulerMessage);
+
+ for (int i = 0; i < 30; i++) {
+ Thread.sleep(2000);
+ if (_PARTITIONS == factory._results.size()) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, factory._results.size());
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
+ .getMsgId().stringify());
+
+ int messageResultCount = 0;
+ for (int i = 0; i < 10; i++) {
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+ .equals("" + (_PARTITIONS * 3)));
+ for (String key : statusUpdate.getMapFields().keySet()) {
+ if (key.startsWith("MessageResult ")) {
+ messageResultCount++;
+ }
+ }
+ if (messageResultCount == _PARTITIONS * 3) {
+ break;
+ } else {
+ Thread.sleep(2000);
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+ int count = 0;
+ for (Set<String> val : factory._results.values()) {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ }
+}
[3/3] git commit: [HELIX-244] Redesign rebalancers using
rebalancer-specific configs
Posted by zz...@apache.org.
[HELIX-244] Redesign rebalancers using rebalancer-specific configs
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0a8baa12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0a8baa12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0a8baa12
Branch: refs/heads/helix-logical-model
Commit: 0a8baa12f72b5ead84e92bdeb089fb20658df706
Parents: 4e5e3e1
Author: zzhang <zz...@apache.org>
Authored: Fri Sep 13 16:31:52 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Sep 13 16:31:52 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixProperty.java | 10 +-
.../main/java/org/apache/helix/api/Cluster.java | 20 +-
.../org/apache/helix/api/ClusterAccessor.java | 59 ++-
.../org/apache/helix/api/ClusterConfig.java | 34 +-
.../helix/api/CustomRebalancerConfig.java | 143 +++++++
.../helix/api/FullAutoRebalancerConfig.java | 86 ++++
.../org/apache/helix/api/NamespacedConfig.java | 59 ++-
.../org/apache/helix/api/RebalancerConfig.java | 426 +++++++++++--------
.../org/apache/helix/api/RebalancerRef.java | 7 +-
.../java/org/apache/helix/api/Resource.java | 57 ++-
.../helix/api/SemiAutoRebalancerConfig.java | 146 +++++++
.../helix/api/UserDefinedRebalancerConfig.java | 113 +++++
.../rebalancer/NewAutoRebalancer.java | 61 +--
.../rebalancer/NewCustomRebalancer.java | 23 +-
.../controller/rebalancer/NewRebalancer.java | 23 +-
.../rebalancer/NewSemiAutoRebalancer.java | 23 +-
.../rebalancer/NewUserDefinedRebalancer.java | 35 ++
.../helix/controller/stages/AttributeName.java | 3 +-
.../stages/NewBestPossibleStateCalcStage.java | 55 +--
.../stages/NewCurrentStateComputationStage.java | 2 +-
.../stages/NewCurrentStateOutput.java | 255 -----------
.../stages/NewExternalViewComputeStage.java | 2 +-
.../stages/NewMessageGenerationStage.java | 11 +-
.../stages/NewMessageSelectionStage.java | 9 +-
.../stages/NewReadClusterDataStage.java | 9 -
.../stages/NewResourceComputationStage.java | 20 +-
.../controller/stages/ResourceCurrentState.java | 255 +++++++++++
.../strategy/AutoRebalanceStrategy.java | 20 +-
.../helix/model/ClusterConfiguration.java | 2 +-
.../java/org/apache/helix/model/Message.java | 11 +
.../helix/model/PartitionConfiguration.java | 2 +-
.../helix/model/ResourceConfiguration.java | 68 ++-
.../helix/tools/ClusterStateVerifier.java | 14 +-
.../apache/helix/api/TestNamespacedConfig.java | 129 ++++++
.../org/apache/helix/api/TestNewStages.java | 76 +---
.../org/apache/helix/api/TestUserConfig.java | 86 ----
.../helix/controller/stages/BaseStageTest.java | 13 +-
.../TestBestPossibleCalcStageCompatibility.java | 12 +-
.../stages/TestBestPossibleStateCalcStage.java | 7 +-
.../TestCurrentStateComputationStage.java | 8 +-
.../TestCustomizedIdealStateRebalancer.java | 19 +-
.../apache/helix/examples/NewModelExample.java | 9 +-
42 files changed, 1606 insertions(+), 816 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 9f1195f..f52d51c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.helix.api.UserConfig;
+import org.apache.helix.api.NamespacedConfig;
/**
* A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
@@ -228,11 +228,11 @@ public class HelixProperty {
}
/**
- * Add user-defined configuration properties to this property
- * @param userConfig UserConfig properties
+ * Add namespaced configuration properties to this property
+ * @param namespacedConfig namespaced properties
*/
- public void addUserConfig(UserConfig userConfig) {
- UserConfig.addConfigToProperty(this, userConfig);
+ public void addNamespacedConfig(NamespacedConfig namespacedConfig) {
+ NamespacedConfig.addConfigToProperty(this, namespacedConfig);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index ab95936..005ae0d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.StateModelDefinition;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
@@ -70,11 +71,16 @@ public class Cluster {
* @param participantMap
* @param controllerMap
* @param leaderId
+ * @param constraintMap
+ * @param stateModelMap
+ * @param userConfig
+ * @param isPaused
*/
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
- UserConfig userConfig, boolean isPaused) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -93,8 +99,8 @@ public class Cluster {
}
});
_config =
- new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, userConfig,
- isPaused);
+ new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap,
+ stateModelMap, userConfig, isPaused);
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -191,6 +197,14 @@ public class Cluster {
}
/**
+ * Get all the state model definitions on the cluster
+ * @return map of state model definition id to state model definition
+ */
+ public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+ return _config.getStateModelMap();
+ }
+
+ /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 59da8b7..4bfe780 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -20,6 +20,7 @@ package org.apache.helix.api;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,12 +36,14 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PartitionConfiguration;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
public class ClusterAccessor {
@@ -84,6 +87,12 @@ public class ClusterAccessor {
if (cluster.isPaused()) {
pauseCluster();
}
+ StateModelDefinitionAccessor stateModelDefAccessor =
+ new StateModelDefinitionAccessor(_accessor);
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
+ for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
+ stateModelDefAccessor.addStateModelDefinition(stateModelDef);
+ }
return true;
}
@@ -199,7 +208,7 @@ public class ClusterAccessor {
}
resourceMap.put(resourceId,
new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
- userConfig, partitionUserConfigs, liveInstanceMap.size()));
+ userConfig, partitionUserConfigs));
}
}
@@ -241,8 +250,13 @@ public class ClusterAccessor {
} else {
userConfig = new UserConfig(_clusterId);
}
+
+ StateModelDefinitionAccessor stateModelDefAccessor =
+ new StateModelDefinitionAccessor(_accessor);
+ Map<StateModelDefId, StateModelDefinition> stateModelMap =
+ stateModelDefAccessor.readStateModelDefinitions();
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, userConfig, isPaused);
+ clusterConstraintMap, stateModelMap, userConfig, isPaused);
}
/**
@@ -264,6 +278,7 @@ public class ClusterAccessor {
* @param resource
*/
public void addResourceToCluster(ResourceConfig resource) {
+ // TODO: this belongs in ResourceAccessor
StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -278,7 +293,9 @@ public class ClusterAccessor {
// Add resource user config
if (resource.getUserConfig() != null) {
- ResourceConfiguration configuration = ResourceConfiguration.from(resource.getUserConfig());
+ ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+ configuration.addNamespacedConfig(resource.getUserConfig());
+ configuration.addRebalancerConfig(resource.getRebalancerConfig());
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
@@ -294,13 +311,24 @@ public class ClusterAccessor {
}
idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
for (PartitionId partitionId : resource.getPartitionSet()) {
- List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
- Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
- if (preferenceList != null) {
- idealState.setPreferenceList(partitionId, preferenceList);
- }
- if (preferenceMap != null) {
- idealState.setParticipantStateMap(partitionId, preferenceMap);
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
+ List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+ if (preferenceList != null) {
+ idealState.setPreferenceList(partitionId, preferenceList);
+ }
+ } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+ Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
+ if (preferenceMap != null) {
+ idealState.setParticipantStateMap(partitionId, preferenceMap);
+ }
+ } else {
+ // TODO: need these for as long as we use IdealState as the backing physical model
+ List<ParticipantId> emptyList = Collections.emptyList();
+ Map<ParticipantId, State> emptyMap = Collections.emptyMap();
+ idealState.setPreferenceList(partitionId, emptyList);
+ idealState.setParticipantStateMap(partitionId, emptyMap);
}
Partition partition = resource.getPartition(partitionId);
if (partition.getUserConfig() != null) {
@@ -317,9 +345,12 @@ public class ClusterAccessor {
if (groupTag != null) {
idealState.setInstanceGroupTag(groupTag);
}
- RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
- if (rebalancerRef != null) {
- idealState.setRebalancerRef(rebalancerRef);
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
+ UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
+ RebalancerRef rebalancerRef = config.getRebalancerRef();
+ if (rebalancerRef != null) {
+ idealState.setRebalancerRef(rebalancerRef);
+ }
}
StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
if (stateModelFactoryId != null) {
@@ -377,7 +408,7 @@ public class ClusterAccessor {
instanceConfig.setPort(Integer.toString(participant.getPort()));
instanceConfig.setInstanceEnabled(participant.isEnabled());
UserConfig userConfig = participant.getUserConfig();
- instanceConfig.addUserConfig(userConfig);
+ instanceConfig.addNamespacedConfig(userConfig);
Set<String> tags = participant.getTags();
for (String tag : tags) {
instanceConfig.addTag(tag);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 5e4a858..dd19096 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -6,6 +6,7 @@ import java.util.Map;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.StateModelDefinition;
import com.google.common.collect.ImmutableMap;
@@ -36,6 +37,7 @@ public class ClusterConfig {
private final Map<ResourceId, ResourceConfig> _resourceMap;
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
private final UserConfig _userConfig;
private final boolean _isPaused;
@@ -44,17 +46,21 @@ public class ClusterConfig {
* @param id cluster id
* @param resourceMap map of resource id to resource config
* @param participantMap map of participant id to participant config
- * @param constraintMapmap of constraint type to all constraints of that type
+ * @param constraintMap map of constraint type to all constraints of that type
+ * @param stateModelMap map of state model id to state model definition
* @param userConfig user-defined cluster properties
* @param isPaused true if paused, false if active
*/
public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
- Map<ConstraintType, ClusterConstraints> constraintMap, UserConfig userConfig, boolean isPaused) {
+ Map<ConstraintType, ClusterConstraints> constraintMap,
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused) {
_id = id;
_resourceMap = ImmutableMap.copyOf(resourceMap);
_participantMap = ImmutableMap.copyOf(participantMap);
_constraintMap = ImmutableMap.copyOf(constraintMap);
+ _stateModelMap = ImmutableMap.copyOf(stateModelMap);
_userConfig = userConfig;
_isPaused = isPaused;
}
@@ -92,6 +98,14 @@ public class ClusterConfig {
}
/**
+ * Get all the state model definitions on the cluster
+ * @return map of state model definition id to state model definition
+ */
+ public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+ return _stateModelMap;
+ }
+
+ /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
@@ -115,6 +129,7 @@ public class ClusterConfig {
private final Map<ResourceId, ResourceConfig> _resourceMap;
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
private UserConfig _userConfig;
private boolean _isPaused;
@@ -127,6 +142,7 @@ public class ClusterConfig {
_resourceMap = new HashMap<ResourceId, ResourceConfig>();
_participantMap = new HashMap<ParticipantId, ParticipantConfig>();
_constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+ _stateModelMap = new HashMap<StateModelDefId, StateModelDefinition>();
_isPaused = false;
_userConfig = new UserConfig(id);
}
@@ -198,6 +214,16 @@ public class ClusterConfig {
}
/**
+ * Add a constraint to the cluster
+ * @param constraint cluster constraint of a specific type
+ * @return Builder
+ */
+ public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
+ _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+ return this;
+ }
+
+ /**
* Set the paused status of the cluster
* @param isPaused true if paused, false otherwise
* @return Builder
@@ -222,8 +248,8 @@ public class ClusterConfig {
* @return ClusterConfig
*/
public ClusterConfig build() {
- return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _userConfig,
- _isPaused);
+ return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
+ _userConfig, _isPaused);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
new file mode 100644
index 0000000..97d4a45
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
@@ -0,0 +1,143 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the CUSTOMIZED rebalancer
+ */
+public final class CustomRebalancerConfig extends RebalancerConfig {
+ /**
+ * Instantiate a new config for CUSTOMIZED
+ * @param resourceId the resource to rebalance
+ * @param stateModelDefId the state model that the resource follows
+ * @param partitionMap map of partition id to partition
+ */
+ public CustomRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+ Map<PartitionId, Partition> partitionMap) {
+ super(resourceId, RebalanceMode.CUSTOMIZED, stateModelDefId, partitionMap);
+ }
+
+ /**
+ * Instantiate from a base RebalancerConfig
+ * @param config populated rebalancer config
+ */
+ private CustomRebalancerConfig(RebalancerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Get the preference map for a partition
+ * @param partitionId the partition to look up
+ * @return preference map of participant to state for each replica
+ */
+ public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+ Map<String, String> rawPreferenceMap = getMapField(partitionId.stringify());
+ if (rawPreferenceMap != null) {
+ return IdealState.participantStateMapFromStringMap(rawPreferenceMap);
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Set the preference map for a partition
+ * @param partitionId the partition to set
+ * @param preferenceMap map of participant to state for each replica
+ */
+ public void setPreferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+ setMapField(partitionId.toString(), IdealState.stringMapFromParticipantStateMap(preferenceMap));
+ }
+
+ /**
+ * Get all the preference maps for a partition
+ * @return map of partition id to map of participant id to state for each replica
+ */
+ public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+ Map<String, Map<String, String>> rawPreferenceMaps = getMapFields();
+ return IdealState.participantStateMapsFromStringMaps(rawPreferenceMaps);
+ }
+
+ /**
+ * Set all the preference maps for a partition
+ * @param preferenceMaps map of partition id to map of participant id to state for each replica
+ */
+ public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+ setMapFields(IdealState.stringMapsFromParticipantStateMaps(preferenceMaps));
+ }
+
+ /**
+ * Get a CustomRebalancerConfig from a RebalancerConfig
+ * @param config populated RebalancerConfig
+ * @return CustomRebalancerConfig
+ */
+ public static CustomRebalancerConfig from(RebalancerConfig config) {
+ return new CustomRebalancerConfig(config);
+ }
+
+ /**
+ * Assembler for a CUSTOMIZED configuration
+ */
+ public static class Builder extends RebalancerConfig.Builder<Builder> {
+ private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+ /**
+ * Build for a specific resource
+ * @param resourceId the resource to rebalance
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ _preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
+ }
+
+ /**
+ * Add a preference map of a partition
+ * @param partitionId the partition to set
+ * @param preferenceMap map of participant id to state
+ * @return Builder
+ */
+ public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+ _preferenceMaps.put(partitionId, preferenceMap);
+ return this;
+ }
+
+ @Override
+ public CustomRebalancerConfig build() {
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ CustomRebalancerConfig config =
+ new CustomRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+ update(config);
+ config.setPreferenceMaps(_preferenceMaps);
+ return config;
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
new file mode 100644
index 0000000..3482d16
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
@@ -0,0 +1,86 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the FULL_AUTO rebalancer
+ */
+public final class FullAutoRebalancerConfig extends RebalancerConfig {
+ /**
+ * Instantiate a new config for FULL_AUTO
+ * @param resourceId the resource to rebalance
+ * @param stateModelDefId the state model that the resource follows
+ * @param partitionMap map of partition id to partition
+ */
+ public FullAutoRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+ Map<PartitionId, Partition> partitionMap) {
+ super(resourceId, RebalanceMode.FULL_AUTO, stateModelDefId, partitionMap);
+ }
+
+ /**
+ * Instantiate from a base RebalancerConfig
+ * @param config populated rebalancer config
+ */
+ private FullAutoRebalancerConfig(RebalancerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Get a FullAutoRebalancerConfig from a RebalancerConfig
+ * @param config populated RebalancerConfig
+ * @return FullAutoRebalancerConfig
+ */
+ public static FullAutoRebalancerConfig from(RebalancerConfig config) {
+ return new FullAutoRebalancerConfig(config);
+ }
+
+ /**
+ * Assembler for a FULL_AUTO configuration
+ */
+ public static class Builder extends RebalancerConfig.Builder<Builder> {
+ /**
+ * Build for a specific resource
+ * @param resourceId the resource to rebalance
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ }
+
+ @Override
+ public FullAutoRebalancerConfig build() {
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ FullAutoRebalancerConfig config =
+ new FullAutoRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+ update(config);
+ return config;
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
index 0a46fa7..f7656d7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
@@ -46,22 +46,29 @@ public abstract class NamespacedConfig extends ZNRecord {
}
/**
- * Instantiate a UserConfig from an existing HelixProperty
+ * Instantiate a NamespacedConfig from an existing HelixProperty
* @param property property wrapping a configuration
*/
public NamespacedConfig(HelixProperty property, String prefix) {
super(property.getRecord());
_prefix = prefix + '_';
- // filter out any configuration that isn't user-defined
- Predicate<String> keyFilter = new Predicate<String>() {
- @Override
- public boolean apply(String key) {
- return key.contains(_prefix);
- }
- };
- super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
- super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
- super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
+ filterNonPrefixedFields();
+ }
+
+ /**
+ * Instantiate a NamespacedConfig as a copy of another NamedspacedConfig
+ * @param config populated NamespacedConfig
+ */
+ public NamespacedConfig(NamespacedConfig config) {
+ super(config.getId());
+ _prefix = config.getPrefix() + '_';
+ if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
+ setRawPayload(config.getRawPayload());
+ setPayloadSerializer(config.getPayloadSerializer());
+ }
+ super.setSimpleFields(config.getPrefixedSimpleFields());
+ super.setListFields(config.getPrefixedListFields());
+ super.setMapFields(config.getPrefixedMapFields());
}
@Override
@@ -77,7 +84,7 @@ public abstract class NamespacedConfig extends ZNRecord {
@Override
public void setMapFields(Map<String, Map<String, String>> mapFields) {
for (String k : mapFields.keySet()) {
- setMapField(_prefix + k, mapFields.get(k));
+ super.setMapField(_prefix + k, mapFields.get(k));
}
}
@@ -102,7 +109,7 @@ public abstract class NamespacedConfig extends ZNRecord {
@Override
public void setListFields(Map<String, List<String>> listFields) {
for (String k : listFields.keySet()) {
- setListField(_prefix + k, listFields.get(k));
+ super.setListField(_prefix + k, listFields.get(k));
}
}
@@ -140,6 +147,30 @@ public abstract class NamespacedConfig extends ZNRecord {
}
/**
+ * Get the prefix used to distinguish these config properties
+ * @return string prefix, not including the underscore
+ */
+ public String getPrefix() {
+ return _prefix.substring(0, _prefix.indexOf('_'));
+ }
+
+ /**
+ * Remove all fields from this config that are not prefixed
+ */
+ private void filterNonPrefixedFields() {
+ // filter out any configuration that isn't user-defined
+ Predicate<String> keyFilter = new Predicate<String>() {
+ @Override
+ public boolean apply(String key) {
+ return key.contains(_prefix);
+ }
+ };
+ super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
+ super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
+ super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
+ }
+
+ /**
* Get all map fields with prefixed keys
* @return prefixed map fields
*/
@@ -187,7 +218,7 @@ public abstract class NamespacedConfig extends ZNRecord {
private static <T> Map<String, T> convertToPrefixlessMap(Map<String, T> rawMap, String prefix) {
Map<String, T> convertedMap = new HashMap<String, T>();
for (String rawKey : rawMap.keySet()) {
- String k = rawKey.substring(rawKey.indexOf(prefix) + 1);
+ String k = rawKey.substring(prefix.length());
convertedMap.put(k, rawMap.get(rawKey));
}
return ImmutableMap.copyOf(convertedMap);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 1f401c8..1816d91 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,84 +19,106 @@ package org.apache.helix.api;
* under the License.
*/
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* Captures the configuration properties necessary for rebalancing
*/
public class RebalancerConfig extends NamespacedConfig {
- private final RebalanceMode _rebalancerMode;
- private final RebalancerRef _rebalancerRef;
- private final StateModelDefId _stateModelDefId;
- private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
- private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
- private final ResourceAssignment _resourceAssignment;
- private final int _replicaCount;
- private final boolean _anyLiveParticipant;
- private final String _participantGroupTag;
- private final int _maxPartitionsPerParticipant;
- private final StateModelFactoryId _stateModelFactoryId;
+
+ /**
+ * Fields used by the base RebalancerConfig
+ */
+ public enum Fields {
+ ANY_LIVE_PARTICIPANT,
+ MAX_PARTITIONS_PER_PARTICIPANT,
+ PARTICIPANT_GROUP_TAG,
+ REBALANCE_MODE,
+ REPLICA_COUNT,
+ STATE_MODEL_DEFINITION,
+ STATE_MODEL_FACTORY
+ }
+
+ private final ResourceId _resourceId;
+ private final Set<String> _fieldsSet;
private final Map<PartitionId, Partition> _partitionMap;
/**
- * Instantiate the configuration of a rebalance task
- * @param idealState the physical ideal state
- * @param resourceAssignment last mapping of a resource
+ * Instantiate a RebalancerConfig.
+ * @param resourceId the resource to rebalance
+ * @param rebalancerMode the mode to rebalance with
+ * @param stateModelDefId the state model that the resource uses
+ * @param partitionMap partitions of the resource
*/
- public RebalancerConfig(Map<PartitionId, Partition> partitionMap, IdealState idealState,
- ResourceAssignment resourceAssignment, int liveParticipantCount) {
- super(idealState.getResourceId(), RebalancerConfig.class.getSimpleName());
+ public RebalancerConfig(ResourceId resourceId, RebalanceMode rebalancerMode,
+ StateModelDefId stateModelDefId, Map<PartitionId, Partition> partitionMap) {
+ super(resourceId, RebalancerConfig.class.getSimpleName());
+ _resourceId = resourceId;
+ _fieldsSet =
+ ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+ Functions.toStringFunction()));
+ setEnumField(Fields.REBALANCE_MODE.toString(), rebalancerMode);
+ setSimpleField(Fields.STATE_MODEL_DEFINITION.toString(), stateModelDefId.stringify());
_partitionMap = ImmutableMap.copyOf(partitionMap);
- _rebalancerMode = idealState.getRebalanceMode();
- _rebalancerRef = idealState.getRebalancerRef();
- _stateModelDefId = idealState.getStateModelDefId();
- String replicaCount = idealState.getReplicas();
- if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
- _replicaCount = liveParticipantCount;
- _anyLiveParticipant = true;
- } else {
- _replicaCount = Integer.parseInt(idealState.getReplicas());
- _anyLiveParticipant = false;
- }
- _participantGroupTag = idealState.getInstanceGroupTag();
- _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
- _stateModelFactoryId = idealState.getStateModelFactoryId();
-
- // Build preference lists and maps
- ImmutableMap.Builder<PartitionId, List<ParticipantId>> preferenceLists =
- new ImmutableMap.Builder<PartitionId, List<ParticipantId>>();
- ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
- new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
- for (PartitionId partitionId : idealState.getPartitionSet()) {
- List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
- if (preferenceList != null) {
- preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
- }
- Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
- if (preferenceMap != null) {
- preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
- }
+ }
+
+ /**
+ * Extract rebalancer-specific configuration from a physical resource config
+ * @param resourceConfiguration resource config
+ * @param partitionMap map of partition id to partition object
+ */
+ protected RebalancerConfig(ResourceConfiguration resourceConfiguration,
+ Map<PartitionId, Partition> partitionMap) {
+ super(resourceConfiguration, RebalancerConfig.class.getSimpleName());
+ _resourceId = resourceConfiguration.getResourceId();
+ _fieldsSet =
+ ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+ Functions.toStringFunction()));
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
+ }
+
+ /**
+ * Copy a RebalancerConfig from an existing one
+ * @param config populated RebalancerConfig
+ */
+ protected RebalancerConfig(RebalancerConfig config) {
+ super(config.getResourceId(), RebalancerConfig.class.getSimpleName());
+ _resourceId = config.getResourceId();
+ _partitionMap = config.getPartitionMap();
+ _fieldsSet =
+ ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+ Functions.toStringFunction()));
+ super.setSimpleFields(config.getRawSimpleFields());
+ super.setListFields(config.getRawListFields());
+ super.setMapFields(config.getRawMapFields());
+ if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
+ setRawPayload(config.getRawPayload());
+ setPayloadSerializer(config.getPayloadSerializer());
}
- _preferenceLists = preferenceLists.build();
- _preferenceMaps = preferenceMaps.build();
+ }
- // Leave the resource assignment as is
- _resourceAssignment = resourceAssignment;
+ /**
+ * Get the resource id
+ * @return ResourceId
+ */
+ public ResourceId getResourceId() {
+ return _resourceId;
}
/**
@@ -131,15 +153,7 @@ public class RebalancerConfig extends NamespacedConfig {
* @return rebalancer mode
*/
public RebalanceMode getRebalancerMode() {
- return _rebalancerMode;
- }
-
- /**
- * Get the rebalancer class name
- * @return rebalancer class name or null if not exist
- */
- public RebalancerRef getRebalancerRef() {
- return _rebalancerRef;
+ return getEnumField(Fields.REBALANCE_MODE.toString(), RebalanceMode.class, RebalanceMode.NONE);
}
/**
@@ -147,63 +161,56 @@ public class RebalancerConfig extends NamespacedConfig {
* @return state model definition
*/
public StateModelDefId getStateModelDefId() {
- return _stateModelDefId;
+ return Id.stateModelDef(getStringField(Fields.STATE_MODEL_DEFINITION.toString(), null));
}
/**
- * Get the ideal node and state assignment of the resource
- * @return resource assignment
+ * Get the number of replicas each partition should have. This function will return 0 if some
+ * policy overrides the replica count, for instance if any live participant can accept a replica.
+ * @return replica count
*/
- public ResourceAssignment getResourceAssignment() {
- return _resourceAssignment;
+ public int getReplicaCount() {
+ return getIntField(Fields.REPLICA_COUNT.toString(), 0);
}
/**
- * Get the preference list of participants for a given partition
- * @param partitionId the partition to look up
- * @return the ordered preference list (early entries are more preferred)
+ * Set the number of replicas each partition should have.
+ * @param replicaCount replica count
*/
- public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- if (_preferenceLists.containsKey(partitionId)) {
- return _preferenceLists.get(partitionId);
- }
- return Collections.emptyList();
+ public void setReplicaCount(int replicaCount) {
+ setIntField(Fields.REPLICA_COUNT.toString(), replicaCount);
}
/**
- * Get the preference map of participants and states for a given partition
- * @param partitionId the partition to look up
- * @return a mapping of participant to state for each replica
+ * Get the number of partitions of this resource that a given participant can accept
+ * @return maximum number of partitions
*/
- public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- if (_preferenceMaps.containsKey(partitionId)) {
- return _preferenceMaps.get(partitionId);
- }
- return Collections.emptyMap();
+ public int getMaxPartitionsPerParticipant() {
+ return getIntField(Fields.MAX_PARTITIONS_PER_PARTICIPANT.toString(), Integer.MAX_VALUE);
}
/**
- * Get the number of replicas each partition should have
- * @return replica count
+ * Set the number of partitions of this resource that a given participant can accept
+ * @param maxPartitionsPerParticipant maximum number of partitions
*/
- public int getReplicaCount() {
- return _replicaCount;
+ public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+ setIntField(Fields.MAX_PARTITIONS_PER_PARTICIPANT.toString(), maxPartitionsPerParticipant);
}
/**
- * Get the number of partitions of this resource that a given participant can accept
- * @return maximum number of partitions
+ * Get the tag, if any, which must be present on assignable instances
+ * @return group tag, or null if it is not present
*/
- public int getMaxPartitionsPerParticipant() {
- return _maxPartitionsPerParticipant;
+ public String getParticipantGroupTag() {
+ return getStringField(Fields.PARTICIPANT_GROUP_TAG.toString(), null);
}
/**
- * Get the tag, if any, which must be present on assignable instances
- * @return group tag
+ * Set the tag, if any, which must be present on assignable instances
+ * @param participantGroupTag group tag
*/
- public String getParticipantGroupTag() {
- return _participantGroupTag;
+ public void setParticipantGroupTag(String participantGroupTag) {
+ setSimpleField(Fields.PARTICIPANT_GROUP_TAG.toString(), participantGroupTag);
}
/**
@@ -211,7 +218,17 @@ public class RebalancerConfig extends NamespacedConfig {
* @return state model factory id
*/
public StateModelFactoryId getStateModelFactoryId() {
- return _stateModelFactoryId;
+ return Id.stateModelFactory(getStringField(Fields.STATE_MODEL_FACTORY.toString(), null));
+ }
+
+ /**
+ * Set state model factory id
+ * @param factoryId state model factory id
+ */
+ public void setStateModelFactoryId(StateModelFactoryId factoryId) {
+ if (factoryId != null) {
+ setSimpleField(Fields.STATE_MODEL_FACTORY.toString(), factoryId.stringify());
+ }
}
/**
@@ -219,67 +236,129 @@ public class RebalancerConfig extends NamespacedConfig {
* @return true if they can, false if they cannot
*/
public boolean canAssignAnyLiveParticipant() {
- return _anyLiveParticipant;
+ return getBooleanField(Fields.ANY_LIVE_PARTICIPANT.toString(), false);
+ }
+
+ /**
+ * Specify if replicas can be assigned to any live participant
+ * @param canAssignAny true if they can, false if they cannot
+ */
+ public void setCanAssignAnyLiveParticipant(boolean canAssignAny) {
+ setBooleanField(Fields.ANY_LIVE_PARTICIPANT.toString(), canAssignAny);
+ }
+
+ /*
+ * Override: removes from view fields set by RebalancerConfig
+ */
+ @Override
+ public Map<String, String> getSimpleFields() {
+ return Maps.filterKeys(super.getSimpleFields(), filterBaseConfigFields());
+ }
+
+ /*
+ * Override: makes sure that updated simple fields include those from this class
+ */
+ @Override
+ public void setSimpleFields(Map<String, String> simpleFields) {
+ Map<String, String> copySimpleFields = new HashMap<String, String>();
+ copySimpleFields.putAll(simpleFields);
+ for (String field : _fieldsSet) {
+ String value = getStringField(field, null);
+ if (value != null) {
+ copySimpleFields.put(field, value);
+ }
+ }
+ super.setSimpleFields(copySimpleFields);
+ }
+
+ /**
+ * Get a predicate that can checks if a key is used by this config
+ * @return Guava Predicate
+ */
+ private Predicate<String> filterBaseConfigFields() {
+ return new Predicate<String>() {
+ @Override
+ public boolean apply(String key) {
+ return !_fieldsSet.contains(key);
+ }
+ };
+ }
+
+ /**
+ * Get simple fields without filtering out base fields
+ * @return simple fields
+ */
+ private Map<String, String> getRawSimpleFields() {
+ return super.getSimpleFields();
+ }
+
+ /**
+ * Get list fields without filtering out base fields
+ * @return list fields
+ */
+ private Map<String, List<String>> getRawListFields() {
+ return super.getListFields();
+ }
+
+ /**
+ * Get map fields without filtering out base fields
+ * @return map fields
+ */
+ private Map<String, Map<String, String>> getRawMapFields() {
+ return super.getMapFields();
+ }
+
+ /**
+ * Get a RebalancerConfig from a physical resource configuration
+ * @param config resource configuration
+ * @return RebalancerConfig
+ */
+ public static RebalancerConfig from(ResourceConfiguration config,
+ Map<PartitionId, UserConfig> partitionConfigs) {
+ Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
+ for (PartitionId partitionId : config.getPartitionIds()) {
+ if (partitionConfigs.containsKey(partitionId)) {
+ partitionMap
+ .put(partitionId, new Partition(partitionId, partitionConfigs.get(partitionId)));
+ } else {
+ partitionMap.put(partitionId, new Partition(partitionId));
+ }
+ }
+ return new RebalancerConfig(config, partitionMap);
}
/**
* Assembles a RebalancerConfig
*/
- public static class Builder {
- private final ResourceId _id;
- private final IdealState _idealState;
+ public static abstract class Builder<T extends Builder<T>> {
+ protected final ResourceId _resourceId;
+ protected final Map<PartitionId, Partition> _partitionMap;
+ protected StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
private boolean _anyLiveParticipant;
- private ResourceAssignment _resourceAssignment;
- private final Map<PartitionId, Partition> _partitionMap;
+ private int _replicaCount;
+ private int _maxPartitionsPerParticipant;
/**
* Configure the rebalancer for a resource
* @param resourceId the resource to rebalance
*/
public Builder(ResourceId resourceId) {
- _id = resourceId;
- _idealState = new IdealState(resourceId);
+ _resourceId = resourceId;
_anyLiveParticipant = false;
+ _replicaCount = 0;
+ _maxPartitionsPerParticipant = Integer.MAX_VALUE;
_partitionMap = new HashMap<PartitionId, Partition>();
}
/**
- * Set the rebalancer mode
- * @param mode {@link RebalanceMode}
- */
- public Builder rebalancerMode(RebalanceMode mode) {
- _idealState.setRebalanceMode(mode);
- return this;
- }
-
- /**
- * Set a user-defined rebalancer
- * @param rebalancerRef a reference to the rebalancer
- * @return Builder
- */
- public Builder rebalancer(RebalancerRef rebalancerRef) {
- _idealState.setRebalancerRef(rebalancerRef);
- return this;
- }
-
- /**
* Set the state model definition
* @param stateModelDefId state model identifier
* @return Builder
*/
- public Builder stateModelDef(StateModelDefId stateModelDefId) {
- _idealState.setStateModelDefId(stateModelDefId);
- return this;
- }
-
- /**
- * Set the full assignment of partitions to nodes and corresponding states
- * @param resourceAssignment resource assignment
- * @return Builder
- */
- public Builder resourceAssignment(ResourceAssignment resourceAssignment) {
- _resourceAssignment = resourceAssignment;
- return this;
+ public T stateModelDef(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ return self();
}
/**
@@ -287,9 +366,9 @@ public class RebalancerConfig extends NamespacedConfig {
* @param replicaCount number of replicas
* @return Builder
*/
- public Builder replicaCount(int replicaCount) {
- _idealState.setReplicas(Integer.toString(replicaCount));
- return this;
+ public T replicaCount(int replicaCount) {
+ _replicaCount = replicaCount;
+ return self();
}
/**
@@ -297,9 +376,9 @@ public class RebalancerConfig extends NamespacedConfig {
* @param maxPartitions
* @return Builder
*/
- public Builder maxPartitionsPerParticipant(int maxPartitions) {
- _idealState.setMaxPartitionsPerInstance(maxPartitions);
- return this;
+ public T maxPartitionsPerParticipant(int maxPartitions) {
+ _maxPartitionsPerParticipant = maxPartitions;
+ return self();
}
/**
@@ -307,19 +386,19 @@ public class RebalancerConfig extends NamespacedConfig {
* @param stateModelFactoryId
* @return Builder
*/
- public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _idealState.setStateModelFactoryId(stateModelFactoryId);
- return this;
+ public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ return self();
}
/**
* Set whether any live participant should be used in rebalancing
* @param useAnyParticipant true if any live participant can be used, false otherwise
- * @return
+ * @return Builder
*/
- public Builder anyLiveParticipant(boolean useAnyParticipant) {
- _anyLiveParticipant = true;
- return this;
+ public T anyLiveParticipant(boolean useAnyParticipant) {
+ _anyLiveParticipant = useAnyParticipant;
+ return self();
}
/**
@@ -327,9 +406,9 @@ public class RebalancerConfig extends NamespacedConfig {
* @param partition fully-qualified partition
* @return Builder
*/
- public Builder addPartition(Partition partition) {
+ public T addPartition(Partition partition) {
_partitionMap.put(partition.getId(), partition);
- return this;
+ return self();
}
/**
@@ -337,11 +416,11 @@ public class RebalancerConfig extends NamespacedConfig {
* @param partitions
* @return Builder
*/
- public Builder addPartitions(Collection<Partition> partitions) {
+ public T addPartitions(Collection<Partition> partitions) {
for (Partition partition : partitions) {
addPartition(partition);
}
- return this;
+ return self();
}
/**
@@ -351,28 +430,35 @@ public class RebalancerConfig extends NamespacedConfig {
* @param partitionCount number of partitions to add
* @return Builder
*/
- public Builder addPartitions(int partitionCount) {
+ public T addPartitions(int partitionCount) {
for (int i = 0; i < partitionCount; i++) {
- addPartition(new Partition(Id.partition(_id, Integer.toString(i)), null));
+ addPartition(new Partition(Id.partition(_resourceId, Integer.toString(i)), null));
}
- return this;
+ return self();
}
/**
- * Assemble a RebalancerConfig
- * @return a fully defined rebalancer configuration
+ * Update a RebalancerConfig with built fields
*/
- public RebalancerConfig build() {
- // add a single partition if one hasn't been added yet since 1 partition is default
- if (_partitionMap.isEmpty()) {
- addPartitions(1);
- }
- if (_anyLiveParticipant) {
- return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment,
- Integer.parseInt(_idealState.getReplicas()));
- } else {
- return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment, -1);
+ protected void update(RebalancerConfig rebalancerConfig) {
+ rebalancerConfig.setReplicaCount(_replicaCount);
+ rebalancerConfig.setCanAssignAnyLiveParticipant(_anyLiveParticipant);
+ rebalancerConfig.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+ if (_stateModelFactoryId != null) {
+ rebalancerConfig.setStateModelFactoryId(_stateModelFactoryId);
}
}
+
+ /**
+ * Get a reference to the actual builder class
+ * @return Builder
+ */
+ protected abstract T self();
+
+ /**
+ * Get a fully-initialized RebalancerConfig instance
+ * @return RebalancerConfig based on what was built
+ */
+ public abstract RebalancerConfig build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5c7ce56..f610585 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -19,7 +19,7 @@ package org.apache.helix.api;
* under the License.
*/
-import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
@@ -35,9 +35,10 @@ public class RebalancerRef {
/**
* @return
*/
- public NewRebalancer getRebalancer() {
+ public NewUserDefinedRebalancer getRebalancer() {
try {
- return (NewRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ return (NewUserDefinedRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName)
+ .newInstance());
} catch (Exception e) {
LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 2ee7a4f..c598550 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -24,8 +24,10 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
@@ -45,7 +47,7 @@ public class Resource {
*/
public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
ExternalView externalView, UserConfig userConfig,
- Map<PartitionId, UserConfig> partitionUserConfigs, int liveParticipantCount) {
+ Map<PartitionId, UserConfig> partitionUserConfigs) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
new HashMap<PartitionId, Map<String, String>>();
Set<PartitionId> partitionSet = idealState.getPartitionSet();
@@ -65,10 +67,57 @@ public class Resource {
}
- SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
+ String replicas = idealState.getReplicas();
+ boolean anyLiveParticipant = false;
+ int replicaCount = 0;
+ if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+ anyLiveParticipant = true;
+ } else {
+ replicaCount = Integer.parseInt(replicas);
+ }
- RebalancerConfig rebalancerConfig =
- new RebalancerConfig(partitionMap, idealState, resourceAssignment, liveParticipantCount);
+ // Build a RebalancerConfig specific to the mode
+ RebalancerConfig rebalancerConfig = null;
+ if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ rebalancerConfig =
+ new FullAutoRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+ .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .stateModelDef(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+ } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerConfig semiAutoConfig =
+ new SemiAutoRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+ .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .stateModelDef(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+ for (PartitionId partitionId : partitionMap.keySet()) {
+ semiAutoConfig.setPreferenceList(partitionId, idealState.getPreferenceList(partitionId));
+ }
+ rebalancerConfig = semiAutoConfig;
+ } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig customConfig =
+ new CustomRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+ .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .stateModelDef(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+ for (PartitionId partitionId : partitionMap.keySet()) {
+ customConfig.setPreferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+ }
+ rebalancerConfig = customConfig;
+ } else if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ rebalancerConfig =
+ new UserDefinedRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+ .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .stateModelDef(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId())
+ .rebalancerRef(idealState.getRebalancerRef()).build();
+ }
+
+ SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
_config =
new ResourceConfig(id, schedulerTaskConfig, rebalancerConfig, userConfig,
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
new file mode 100644
index 0000000..a5284a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
@@ -0,0 +1,146 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the SEMI_AUTO rebalancer
+ */
+public final class SemiAutoRebalancerConfig extends RebalancerConfig {
+ /**
+ * Instantiate a new config for SEMI_AUTO
+ * @param resourceId the resource to rebalance
+ * @param stateModelDefId the state model that the resource follows
+ * @param partitionMap map of partition id to partition
+ */
+ public SemiAutoRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+ Map<PartitionId, Partition> partitionMap) {
+ super(resourceId, RebalanceMode.SEMI_AUTO, stateModelDefId, partitionMap);
+ }
+
+ /**
+ * Instantiate from a base RebalancerConfig
+ * @param config populated rebalancer config
+ */
+ private SemiAutoRebalancerConfig(RebalancerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Get the preference list for a partition
+ * @param partitionId the partition to look up
+ * @return preference list ordered from most preferred to least preferred
+ */
+ public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+ List<String> rawPreferenceList = getListField(partitionId.stringify());
+ if (rawPreferenceList != null) {
+ return IdealState.preferenceListFromStringList(rawPreferenceList);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Set the preference list for a partition
+ * @param partitionId the partition to set
+ * @param preferenceList preference list ordered from most preferred to least preferred
+ */
+ public void setPreferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+ setListField(partitionId.toString(), IdealState.stringListFromPreferenceList(preferenceList));
+ }
+
+ /**
+ * Get all the preference lists for a partition
+ * @return map of partition id to list of participants ordered from most preferred to least
+ * preferred
+ */
+ public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+ Map<String, List<String>> rawPreferenceLists = getListFields();
+ return IdealState.preferenceListsFromStringLists(rawPreferenceLists);
+ }
+
+ /**
+ * Set all the preference lists for a partition
+ * @param preferenceLists map of partition id to list of participants ordered from most preferred
+ * to least preferred
+ */
+ public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+ setListFields(IdealState.stringListsFromPreferenceLists(preferenceLists));
+ }
+
+ /**
+ * Get a SemiAutoRebalancerConfig from a RebalancerConfig
+ * @param config populated RebalancerConfig
+ * @return SemiAutoRebalancerConfig
+ */
+ public static SemiAutoRebalancerConfig from(RebalancerConfig config) {
+ return new SemiAutoRebalancerConfig(config);
+ }
+
+ /**
+ * Assembler for a SEMI_AUTO configuration
+ */
+ public static class Builder extends RebalancerConfig.Builder<Builder> {
+ private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Build for a specific resource
+ * @param resourceId the resource to rebalance
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ _preferenceLists = new HashMap<PartitionId, List<ParticipantId>>();
+ }
+
+ /**
+ * Add a preference list of a partition
+ * @param partitionId the partition to set
+ * @param preferenceList list of participant ids, most preferred first
+ * @return Builder
+ */
+ public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+ _preferenceLists.put(partitionId, preferenceList);
+ return this;
+ }
+
+ @Override
+ public SemiAutoRebalancerConfig build() {
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ SemiAutoRebalancerConfig config =
+ new SemiAutoRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+ update(config);
+ config.setPreferenceLists(_preferenceLists);
+ return config;
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
new file mode 100644
index 0000000..345c80b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
@@ -0,0 +1,113 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the USER_DEFINED rebalancer. If additional fields are necessary, all
+ * getters and setters for simple, list, and map fields are available.
+ */
+public final class UserDefinedRebalancerConfig extends RebalancerConfig {
+ public enum Fields {
+ REBALANCER_CLASS_NAME
+ }
+
+ /**
+ * Instantiate a new config for USER_DEFINED
+ * @param resourceId the resource to rebalance
+ * @param stateModelDefId the state model that the resource follows
+ * @param partitionMap map of partition id to partition
+ * @param rebalancerRef instantiated rebalancer reference
+ */
+ public UserDefinedRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+ Map<PartitionId, Partition> partitionMap, RebalancerRef rebalancerRef) {
+ super(resourceId, RebalanceMode.USER_DEFINED, stateModelDefId, partitionMap);
+ setSimpleField(Fields.REBALANCER_CLASS_NAME.toString(), rebalancerRef.toString());
+ }
+
+ /**
+ * Instantiate from a base RebalancerConfig
+ * @param config populated rebalancer config
+ */
+ private UserDefinedRebalancerConfig(RebalancerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Get a reference to the class used to rebalance this resource
+ * @return RebalancerRef, or null if none set
+ */
+ public RebalancerRef getRebalancerRef() {
+ String rebalancerClassName = getStringField(Fields.REBALANCER_CLASS_NAME.toString(), null);
+ if (rebalancerClassName != null) {
+ return RebalancerRef.from(rebalancerClassName);
+ }
+ return null;
+ }
+
+ /**
+ * Get a UserDefinedRebalancerConfig from a RebalancerConfig
+ * @param config populated RebalancerConfig
+ * @return UserDefinedRebalancerConfig
+ */
+ public static UserDefinedRebalancerConfig from(RebalancerConfig config) {
+ return new UserDefinedRebalancerConfig(config);
+ }
+
+ /**
+ * Assembler for a USER_DEFINED configuration
+ */
+ public static class Builder extends RebalancerConfig.Builder<Builder> {
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Build for a specific resource
+ * @param resourceId the resource to rebalance
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ }
+
+ public Builder rebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ return this;
+ }
+
+ @Override
+ public UserDefinedRebalancerConfig build() {
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ UserDefinedRebalancerConfig config =
+ new UserDefinedRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap,
+ _rebalancerRef);
+ update(config);
+ return config;
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 6e13a14..798e883 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -29,17 +30,15 @@ import java.util.Set;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -61,19 +60,20 @@ import com.google.common.collect.Lists;
* The output is a preference list and a mapping based on that preference list, i.e. partition p
* has a replica on node k with state s.
*/
-public class NewAutoRebalancer implements NewRebalancer {
+public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig> {
// These should be final, but are initialized in init rather than a constructor
private AutoRebalanceStrategy _algorithm;
private static final Logger LOG = Logger.getLogger(NewAutoRebalancer.class);
@Override
- public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
- StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+ public ResourceAssignment computeResourceMapping(FullAutoRebalancerConfig config,
+ Cluster cluster, ResourceCurrentState currentState) {
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
- List<PartitionId> partitions = new ArrayList<PartitionId>(resourceConfig.getPartitionSet());
+ List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
- RebalancerConfig config = resourceConfig.getRebalancerConfig();
Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
int replicas = -1;
@@ -91,7 +91,7 @@ public class NewAutoRebalancer implements NewRebalancer {
new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
Map<PartitionId, Map<ParticipantId, State>> currentMapping =
- currentMapping(resourceConfig, currentStateOutput, stateCountMap);
+ currentMapping(config, currentState, stateCountMap);
// If there are nodes tagged with resource, use only those nodes
Set<String> taggedNodes = new HashSet<String>();
@@ -104,7 +104,8 @@ public class NewAutoRebalancer implements NewRebalancer {
}
if (taggedNodes.size() > 0) {
if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + resourceConfig.getId() + " " + taggedNodes);
+ LOG.info("found the following instances with tag " + config.getResourceId() + " "
+ + taggedNodes);
}
liveNodes = new ArrayList<String>(taggedNodes);
}
@@ -121,8 +122,8 @@ public class NewAutoRebalancer implements NewRebalancer {
}
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
_algorithm =
- new AutoRebalanceStrategy(resourceConfig.getId().toString(), partitionNames, stateCountMap,
- maxPartition, placementScheme);
+ new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
+ stateCountMap, maxPartition, placementScheme);
ZNRecord newMapping =
_algorithm.computePartitionAssignment(liveNodes,
ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
@@ -133,40 +134,44 @@ public class NewAutoRebalancer implements NewRebalancer {
// compute a full partition mapping for the resource
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resourceConfig.getId());
+ LOG.debug("Processing resource:" + config.getResourceId());
}
- ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
for (PartitionId partition : partitions) {
Set<ParticipantId> disabledParticipantsForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+ List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+ if (rawPreferenceList == null) {
+ rawPreferenceList = Collections.emptyList();
+ }
List<ParticipantId> preferenceList =
- Lists.transform(newMapping.getListField(partition.stringify()),
- new Function<String, ParticipantId>() {
- @Override
- public ParticipantId apply(String participantName) {
- return Id.participant(participantName);
- }
- });
+ Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return Id.participant(participantName);
+ }
+ });
preferenceList =
NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
stateModelDef, preferenceList,
- currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition),
+ currentState.getCurrentStateMap(config.getResourceId(), partition),
disabledParticipantsForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
- private Map<PartitionId, Map<ParticipantId, State>> currentMapping(ResourceConfig resourceConfig,
- NewCurrentStateOutput currentStateOutput, Map<String, Integer> stateCountMap) {
+ private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+ FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
+ Map<String, Integer> stateCountMap) {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
- for (PartitionId partition : resourceConfig.getPartitionSet()) {
+ for (PartitionId partition : config.getPartitionSet()) {
Map<ParticipantId, State> curStateMap =
- currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+ currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
map.put(partition, new HashMap<ParticipantId, State>());
for (ParticipantId node : curStateMap.keySet()) {
State state = curStateMap.get(node);
@@ -176,7 +181,7 @@ public class NewAutoRebalancer implements NewRebalancer {
}
Map<ParticipantId, State> pendingStateMap =
- currentStateOutput.getPendingStateMap(resourceConfig.getId(), partition);
+ currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
for (ParticipantId node : pendingStateMap.keySet()) {
State state = pendingStateMap.get(node);
if (stateCountMap.containsKey(state)) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 5725ca9..5570c36 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -25,15 +25,13 @@ import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -47,21 +45,22 @@ import org.apache.log4j.Logger;
* The output is a verified mapping based on that preference list, i.e. partition p has a replica
* on node k with state s, where s may be a dropped or error state if necessary.
*/
-public class NewCustomRebalancer implements NewRebalancer {
+public class NewCustomRebalancer implements NewRebalancer<CustomRebalancerConfig> {
private static final Logger LOG = Logger.getLogger(NewCustomRebalancer.class);
@Override
- public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
- StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+ public ResourceAssignment computeResourceMapping(CustomRebalancerConfig config, Cluster cluster,
+ ResourceCurrentState currentState) {
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resourceConfig.getId());
+ LOG.debug("Processing resource:" + config.getResourceId());
}
- ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
- RebalancerConfig config = resourceConfig.getRebalancerConfig();
- for (PartitionId partition : resourceConfig.getPartitionSet()) {
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : config.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+ currentState.getCurrentStateMap(config.getResourceId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
index feac955..4792877 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
@@ -20,28 +20,23 @@ package org.apache.helix.controller.rebalancer;
*/
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
/**
- * Allows one to come up with custom implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the newIdealState for a resource in this method.<br/>
+ * Arbitrary configurable rebalancer interface.
+ * @see {@link NewUserDefinedRebalancer} for an interface with a plugged-in class
*/
-public interface NewRebalancer {
+interface NewRebalancer<T extends RebalancerConfig> {
/**
* Given a resource, existing mapping, and liveness of resources, compute a new mapping of
* resources.
- * @param resource the resource for which a mapping will be computed
+ * @param rebalancerConfig resource properties used by the rebalancer
* @param cluster a snapshot of the entire cluster state
- * @param stateModelDef the state model for which to rebalance the resource
- * @param currentStateOutput a combination of the current states and pending current states
+ * @param currentState a combination of the current states and pending current states
*/
- ResourceAssignment computeResourceMapping(final ResourceConfig resourceConfig,
- final Cluster cluster, final StateModelDefinition stateModelDef,
- final NewCurrentStateOutput currentStateOutput);
+ ResourceAssignment computeResourceMapping(final T rebalancerConfig, final Cluster cluster,
+ final ResourceCurrentState currentState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 6a5072c..cb061ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -26,12 +26,10 @@ import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -45,21 +43,22 @@ import org.apache.log4j.Logger;
* The output is a mapping based on that preference list, i.e. partition p has a replica on node k
* with state s.
*/
-public class NewSemiAutoRebalancer implements NewRebalancer {
+public class NewSemiAutoRebalancer implements NewRebalancer<SemiAutoRebalancerConfig> {
private static final Logger LOG = Logger.getLogger(NewSemiAutoRebalancer.class);
@Override
- public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
- StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+ public ResourceAssignment computeResourceMapping(SemiAutoRebalancerConfig config,
+ Cluster cluster, ResourceCurrentState currentState) {
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resourceConfig.getId());
+ LOG.debug("Processing resource:" + config.getResourceId());
}
- ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
- RebalancerConfig config = resourceConfig.getRebalancerConfig();
- for (PartitionId partition : resourceConfig.getPartitionSet()) {
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : config.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+ currentState.getCurrentStateMap(config.getResourceId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition);
[2/3] [HELIX-244] Redesign rebalancers using rebalancer-specific
configs
Posted by zz...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
new file mode 100644
index 0000000..ae7bba4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
@@ -0,0 +1,35 @@
+package org.apache.helix.controller.rebalancer;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Allows one to come up with a user-defined implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the resourceMapping for a resource in this method.<br/>
+ */
+public interface NewUserDefinedRebalancer extends NewRebalancer<UserDefinedRebalancerConfig> {
+ @Override
+ ResourceAssignment computeResourceMapping(final UserDefinedRebalancerConfig rebalancerConfig,
+ final Cluster cluster, final ResourceCurrentState currentState);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 9abf67c..ae0278b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,6 +26,5 @@ public enum AttributeName {
MESSAGES_ALL,
MESSAGES_SELECTED,
MESSAGES_THROTTLE,
- LOCAL_STATE,
- STATE_MODEL_DEFINITIONS
+ LOCAL_STATE
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc88b73..7c4341b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,19 +23,22 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
+import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
@@ -56,7 +59,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
LOG.info("START BestPossibleStateCalcStage.process()");
}
- NewCurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
@@ -86,7 +89,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
* @return assignment for the dropped resource
*/
private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
- NewCurrentStateOutput currentStateOutput, StateModelDefinition stateModelDef) {
+ ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
Set<PartitionId> mappedPartitions =
currentStateOutput.getCurrentStateMappedPartitions(resourceId);
@@ -105,12 +108,10 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
return partitionMapping;
}
- // TODO check this
private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
- Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
+ Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
- Map<StateModelDefId, StateModelDefinition> stateModelDefs =
- event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
for (ResourceId resourceId : resourceMap.keySet()) {
LOG.debug("Processing resource:" + resourceId);
@@ -132,27 +133,33 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
ResourceConfig resourceConfig = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
- NewRebalancer rebalancer = null;
- if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
- && rebalancerConfig.getRebalancerRef() != null) {
- rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
- }
- if (rebalancer == null) {
+ ResourceAssignment resourceAssignment = null;
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
+ UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
+ if (config.getRebalancerRef() != null) {
+ NewUserDefinedRebalancer rebalancer = config.getRebalancerRef().getRebalancer();
+ resourceAssignment =
+ rebalancer.computeResourceMapping(config, cluster, currentStateOutput);
+ }
+ } else {
if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
- rebalancer = new NewAutoRebalancer();
+ FullAutoRebalancerConfig config = FullAutoRebalancerConfig.from(rebalancerConfig);
+ resourceAssignment =
+ new NewAutoRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
} else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
- rebalancer = new NewSemiAutoRebalancer();
- } else {
- rebalancer = new NewCustomRebalancer();
+ SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
+ resourceAssignment =
+ new NewSemiAutoRebalancer().computeResourceMapping(config, cluster,
+ currentStateOutput);
+ } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+ resourceAssignment =
+ new NewCustomRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
}
}
-
- StateModelDefinition stateModelDef =
- stateModelDefs.get(rebalancerConfig.getStateModelDefId());
- ResourceAssignment resourceAssignment =
- rebalancer.computeResourceMapping(resourceConfig, cluster, stateModelDef,
- currentStateOutput);
if (resourceAssignment == null) {
+ StateModelDefinition stateModelDef =
+ stateModelDefs.get(rebalancerConfig.getStateModelDefId());
resourceAssignment =
mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index e0cd22f..873419c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -56,7 +56,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
+ ". Requires DataCache|RESOURCE");
}
- NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
ParticipantId participantId = liveParticipant.getId();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
deleted file mode 100644
index d8bbfe3..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.model.CurrentState;
-
-public class NewCurrentStateOutput {
- /**
- * map of resource-id to map of partition-id to map of participant-id to state
- * represent current-state for the participant
- */
- private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
-
- /**
- * map of resource-id to map of partition-id to map of participant-id to state
- * represent pending messages for the participant
- */
- private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
-
- /**
- * map of resource-id to state model definition id
- */
- private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
-
- /**
- * map of resource-id to current-state config's
- */
- private final Map<ResourceId, CurrentState> _curStateMetaMap;
-
- /**
- * construct
- */
- public NewCurrentStateOutput() {
- _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
- _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
- _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
- _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
-
- }
-
- /**
- * @param resourceId
- * @param stateModelDefId
- */
- public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
- _resourceStateModelMap.put(resourceId, stateModelDefId);
- }
-
- /**
- * @param resourceId
- * @return
- */
- public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
- return _resourceStateModelMap.get(resourceId);
- }
-
- /**
- * @param resourceId
- * @param bucketSize
- */
- public void setBucketSize(ResourceId resourceId, int bucketSize) {
- CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
- if (curStateMeta == null) {
- curStateMeta = new CurrentState(resourceId);
- _curStateMetaMap.put(resourceId, curStateMeta);
- }
- curStateMeta.setBucketSize(bucketSize);
- }
-
- /**
- * @param resourceId
- * @return
- */
- public int getBucketSize(ResourceId resourceId) {
- int bucketSize = 0;
- CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
- if (curStateMeta != null) {
- bucketSize = curStateMeta.getBucketSize();
- }
-
- return bucketSize;
- }
-
- /**
- * @param stateMap
- * @param resourceId
- * @param partitionId
- * @param participantId
- * @param state
- */
- static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
- ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
- if (!stateMap.containsKey(resourceId)) {
- stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
- }
-
- if (!stateMap.get(resourceId).containsKey(partitionId)) {
- stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
- }
- stateMap.get(resourceId).get(partitionId).put(participantId, state);
- }
-
- /**
- * @param stateMap
- * @param resourceId
- * @param partitionId
- * @param participantId
- * @return state
- */
- static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
- ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
- Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
- if (map != null) {
- Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
- if (instanceStateMap != null) {
- return instanceStateMap.get(participantId);
- }
- }
- return null;
-
- }
-
- /**
- * @param stateMap
- * @param resourceId
- * @param partitionId
- * @return
- */
- static Map<ParticipantId, State> getStateMap(
- Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
- PartitionId partitionId) {
- if (stateMap.containsKey(resourceId)) {
- Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
- if (map.containsKey(partitionId)) {
- return map.get(partitionId);
- }
- }
- return Collections.emptyMap();
- }
-
- /**
- * @param resourceId
- * @param partitionId
- * @param participantId
- * @param state
- */
- public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
- ParticipantId participantId, State state) {
- setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
- }
-
- /**
- * @param resourceId
- * @param partitionId
- * @param participantId
- * @param state
- */
- public void setPendingState(ResourceId resourceId, PartitionId partitionId,
- ParticipantId participantId, State state) {
- setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
- }
-
- /**
- * given (resource, partition, instance), returns currentState
- * @param resourceName
- * @param partition
- * @param instanceName
- * @return
- */
- public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
- ParticipantId participantId) {
- return getState(_currentStateMap, resourceId, partitionId, participantId);
- }
-
- /**
- * given (resource, partition, instance), returns toState
- * @param resourceName
- * @param partition
- * @param instanceName
- * @return
- */
- public State getPendingState(ResourceId resourceId, PartitionId partitionId,
- ParticipantId participantId) {
- return getState(_pendingStateMap, resourceId, partitionId, participantId);
- }
-
- /**
- * @param resourceId
- * @param partitionId
- * @return
- */
- public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
- return getStateMap(_currentStateMap, resourceId, partitionId);
- }
-
- /**
- * Get the partitions mapped in the current state
- * @param resourceId resource to look up
- * @return set of mapped partitions, or empty set if there are none
- */
- public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
- Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
- if (currentStateMap != null) {
- return currentStateMap.keySet();
- }
- return Collections.emptySet();
- }
-
- /**
- * @param resourceId
- * @param partitionId
- * @return
- */
- public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
- return getStateMap(_pendingStateMap, resourceId, partitionId);
-
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("current state= ").append(_currentStateMap);
- sb.append(", pending state= ").append(_pendingStateMap);
- return sb.toString();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 6361dcf..8c151e3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -76,7 +76,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
- NewCurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
List<ExternalView> newExtViews = new ArrayList<ExternalView>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0c9fe9a..c74d577 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -60,11 +60,10 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
- event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- NewCurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
NewBestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
@@ -137,8 +136,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
.getSessionId();
Message message =
createMessage(manager, resourceId, partitionId, participantId, currentState,
- nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resourceConfig
- .getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+ nextState, sessionId, new StateModelDefId(stateModelDef.getId()),
+ resourceConfig.getRebalancerConfig().getStateModelFactoryId(), bucketSize);
// TODO refactor get/set timeout/inner-message
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
@@ -207,7 +206,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
message.setTgtSessionId(participantSessionId);
message.setSrcSessionId(Id.session(manager.getSessionId()));
message.setStateModelDef(stateModelDefId);
- message.setStateModelFactoryName(stateModelFactoryId.stringify());
+ message.setStateModelFactoryId(stateModelFactoryId);
message.setBucketSize(bucketSize);
return message;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 0ebbe74..1ec1a50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.ParticipantConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
@@ -39,10 +38,7 @@ import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -91,11 +87,10 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
- event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- NewCurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
if (cluster == null || resourceMap == null || currentStateOutput == null
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
index 88b2d99..4200fba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
@@ -19,19 +19,14 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.Map;
-
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ClusterAccessor;
import org.apache.helix.api.ClusterId;
import org.apache.helix.api.Id;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.StateModelDefinitionAccessor;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -50,11 +45,8 @@ public class NewReadClusterDataStage extends AbstractBaseStage {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
ClusterId clusterId = Id.cluster(manager.getClusterName());
ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
- StateModelDefinitionAccessor stateModelDefAccessor = new StateModelDefinitionAccessor(accessor);
Cluster cluster = clusterAccessor.readCluster();
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
- stateModelDefAccessor.readStateModelDefinitions();
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
@@ -75,7 +67,6 @@ public class NewReadClusterDataStage extends AbstractBaseStage {
}
event.addAttribute("ClusterDataCache", cluster);
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefMap);
long endTime = System.currentTimeMillis();
LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 50a825b..8e3006f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,10 +19,12 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
@@ -30,10 +32,10 @@ import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.log4j.Logger;
/**
@@ -82,17 +84,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
// don't overwrite ideal state configs
if (!resourceBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.Builder rebalancerConfigBuilder =
- new RebalancerConfig.Builder(resourceId);
- rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
- rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
- .getStateModelFactoryName()));
+ Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
- rebalancerConfigBuilder.addPartition(new Partition(partitionId));
+ partitionMap.put(partitionId, new Partition(partitionId));
}
-
+ RebalancerConfig rebalancerConfig =
+ new RebalancerConfig(resourceId, RebalanceMode.NONE,
+ currentState.getStateModelDefId(), partitionMap);
+ rebalancerConfig.setStateModelFactoryId(Id.stateModelFactory(currentState
+ .getStateModelFactoryName()));
ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
- resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+ resourceBuilder.rebalancerConfig(rebalancerConfig);
resourceBuilder.bucketSize(currentState.getBucketSize());
resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
resourceBuilderMap.put(resourceId, resourceBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
new file mode 100644
index 0000000..db33a4f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -0,0 +1,255 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.CurrentState;
+
+public class ResourceCurrentState {
+ /**
+ * map of resource-id to map of partition-id to map of participant-id to state
+ * represent current-state for the participant
+ */
+ private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
+
+ /**
+ * map of resource-id to map of partition-id to map of participant-id to state
+ * represent pending messages for the participant
+ */
+ private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
+
+ /**
+ * map of resource-id to state model definition id
+ */
+ private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
+
+ /**
+ * map of resource-id to current-state config's
+ */
+ private final Map<ResourceId, CurrentState> _curStateMetaMap;
+
+ /**
+ * construct
+ */
+ public ResourceCurrentState() {
+ _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+ _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+ _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
+ _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
+
+ }
+
+ /**
+ * @param resourceId
+ * @param stateModelDefId
+ */
+ public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
+ _resourceStateModelMap.put(resourceId, stateModelDefId);
+ }
+
+ /**
+ * @param resourceId
+ * @return
+ */
+ public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
+ return _resourceStateModelMap.get(resourceId);
+ }
+
+ /**
+ * @param resourceId
+ * @param bucketSize
+ */
+ public void setBucketSize(ResourceId resourceId, int bucketSize) {
+ CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+ if (curStateMeta == null) {
+ curStateMeta = new CurrentState(resourceId);
+ _curStateMetaMap.put(resourceId, curStateMeta);
+ }
+ curStateMeta.setBucketSize(bucketSize);
+ }
+
+ /**
+ * @param resourceId
+ * @return
+ */
+ public int getBucketSize(ResourceId resourceId) {
+ int bucketSize = 0;
+ CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+ if (curStateMeta != null) {
+ bucketSize = curStateMeta.getBucketSize();
+ }
+
+ return bucketSize;
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+ ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
+ if (!stateMap.containsKey(resourceId)) {
+ stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
+ }
+
+ if (!stateMap.get(resourceId).containsKey(partitionId)) {
+ stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
+ }
+ stateMap.get(resourceId).get(partitionId).put(participantId, state);
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @return state
+ */
+ static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+ ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+ Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+ if (map != null) {
+ Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
+ if (instanceStateMap != null) {
+ return instanceStateMap.get(participantId);
+ }
+ }
+ return null;
+
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ static Map<ParticipantId, State> getStateMap(
+ Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
+ PartitionId partitionId) {
+ if (stateMap.containsKey(resourceId)) {
+ Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+ if (map.containsKey(partitionId)) {
+ return map.get(partitionId);
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId, State state) {
+ setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ public void setPendingState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId, State state) {
+ setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
+ }
+
+ /**
+ * given (resource, partition, instance), returns currentState
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId) {
+ return getState(_currentStateMap, resourceId, partitionId, participantId);
+ }
+
+ /**
+ * given (resource, partition, instance), returns toState
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public State getPendingState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId) {
+ return getState(_pendingStateMap, resourceId, partitionId, participantId);
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
+ return getStateMap(_currentStateMap, resourceId, partitionId);
+ }
+
+ /**
+ * Get the partitions mapped in the current state
+ * @param resourceId resource to look up
+ * @return set of mapped partitions, or empty set if there are none
+ */
+ public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+ Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+ if (currentStateMap != null) {
+ return currentStateMap.keySet();
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
+ return getStateMap(_pendingStateMap, resourceId, partitionId);
+
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("current state= ").append(_currentStateMap);
+ sb.append(", pending state= ").append(_pendingStateMap);
+ return sb.toString();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 72046bf..a522352 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -27,10 +27,10 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.Map.Entry;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
@@ -96,23 +96,27 @@ public class AutoRebalanceStrategy {
*/
public ZNRecord computePartitionAssignment(final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
+ List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
+ Collections.sort(sortedLiveNodes);
+ List<String> sortedAllNodes = new ArrayList<String>(allNodes);
+ Collections.sort(sortedAllNodes);
int numReplicas = countStateReplicas();
ZNRecord znRecord = new ZNRecord(_resourceName);
- if (liveNodes.size() == 0) {
+ if (sortedLiveNodes.size() == 0) {
return znRecord;
}
- int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
- int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+ int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
+ int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
_nodeMap = new HashMap<String, Node>();
_liveNodesList = new ArrayList<Node>();
- for (String id : allNodes) {
+ for (String id : sortedAllNodes) {
Node node = new Node(id);
node.capacity = 0;
node.hasCeilingCapacity = false;
_nodeMap.put(id, node);
}
- for (int i = 0; i < liveNodes.size(); i++) {
+ for (int i = 0; i < sortedLiveNodes.size(); i++) {
boolean usingCeiling = false;
int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
if (distRemainder > 0 && targetSize < _maximumPerNode) {
@@ -120,7 +124,7 @@ public class AutoRebalanceStrategy {
distRemainder = distRemainder - 1;
usingCeiling = true;
}
- Node node = _nodeMap.get(liveNodes.get(i));
+ Node node = _nodeMap.get(sortedLiveNodes.get(i));
node.isAlive = true;
node.capacity = targetSize;
node.hasCeilingCapacity = usingCeiling;
@@ -131,7 +135,7 @@ public class AutoRebalanceStrategy {
_stateMap = generateStateMap();
// compute the preferred mapping if all nodes were up
- _preferredAssignment = computePreferredPlacement(allNodes);
+ _preferredAssignment = computePreferredPlacement(sortedAllNodes);
// logger.info("preferred mapping:"+ preferredAssignment);
// from current mapping derive the ones in preferred location
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index c7fef8d..5bd1e86 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -53,7 +53,7 @@ public class ClusterConfiguration extends HelixProperty {
public static ClusterConfiguration from(UserConfig userConfig) {
ClusterConfiguration clusterConfiguration =
new ClusterConfiguration(Id.cluster(userConfig.getId()));
- clusterConfiguration.addUserConfig(userConfig);
+ clusterConfiguration.addNamespacedConfig(userConfig);
return clusterConfiguration;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 68672e3..189c239 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import com.google.common.collect.ImmutableList;
@@ -574,6 +575,16 @@ public class Message extends HelixProperty {
_record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
}
+ /**
+ * Set the state model factory associated with this message
+ * @param factoryName the name of the factory
+ */
+ public void setStateModelFactoryId(StateModelFactoryId factoryId) {
+ if (factoryId != null) {
+ setStateModelFactoryName(factoryId.stringify());
+ }
+ }
+
// TODO: remove this. impl in HelixProperty
@Override
public int getBucketSize() {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
index 44d6ac4..d5b9627 100644
--- a/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
@@ -53,7 +53,7 @@ public class PartitionConfiguration extends HelixProperty {
public static PartitionConfiguration from(UserConfig userConfig) {
PartitionConfiguration partitionConfiguration =
new PartitionConfiguration(Id.partition(userConfig.getId()));
- partitionConfiguration.addUserConfig(userConfig);
+ partitionConfiguration.addNamespacedConfig(userConfig);
return partitionConfiguration;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 20c05a4..307ab0f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,10 +1,19 @@
package org.apache.helix.model;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
+import org.apache.helix.api.NamespacedConfig;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.UserConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -29,6 +38,10 @@ import org.apache.helix.api.UserConfig;
* Persisted configuration properties for a resource
*/
public class ResourceConfiguration extends HelixProperty {
+ public enum Fields {
+ PARTITION_LIST
+ }
+
/**
* Instantiate for an id
* @param id resource id
@@ -38,6 +51,14 @@ public class ResourceConfiguration extends HelixProperty {
}
/**
+ * Get the resource that is rebalanced
+ * @return resource id
+ */
+ public ResourceId getResourceId() {
+ return Id.resource(getId());
+ }
+
+ /**
* Instantiate from a record
* @param record configuration properties
*/
@@ -46,14 +67,49 @@ public class ResourceConfiguration extends HelixProperty {
}
/**
- * Create a new ResourceConfiguration from a UserConfig
- * @param userConfig user-defined configuration properties
+ * Set the partitions for this resource
+ * @param partitionIds list of partition ids
+ */
+ public void setPartitionIds(List<PartitionId> partitionIds) {
+ _record.setListField(Fields.PARTITION_LIST.toString(),
+ Lists.transform(partitionIds, Functions.toStringFunction()));
+ }
+
+ /**
+ * Get the partitions for this resource
+ * @return list of partition ids
+ */
+ public List<PartitionId> getPartitionIds() {
+ List<String> partitionNames = _record.getListField(Fields.PARTITION_LIST.toString());
+ if (partitionNames != null) {
+ return Lists.transform(partitionNames, new Function<String, PartitionId>() {
+ @Override
+ public PartitionId apply(String partitionName) {
+ return Id.partition(partitionName);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * Add a rebalancer config to this resource
+ * @param config populated rebalancer config
+ */
+ public void addRebalancerConfig(RebalancerConfig config) {
+ addNamespacedConfig(config);
+ setPartitionIds(new ArrayList<PartitionId>(config.getPartitionSet()));
+ }
+
+ /**
+ * Create a new ResourceConfiguration from a NamespacedConfig
+ * @param namespacedConfig namespaced configuration properties
* @return ResourceConfiguration
*/
- public static ResourceConfiguration from(UserConfig userConfig) {
+ public static ResourceConfiguration from(NamespacedConfig namespacedConfig) {
ResourceConfiguration resourceConfiguration =
- new ResourceConfiguration(Id.resource(userConfig.getId()));
- resourceConfiguration.addUserConfig(userConfig);
+ new ResourceConfiguration(Id.resource(namespacedConfig.getId()));
+ resourceConfiguration.addNamespacedConfig(namespacedConfig);
return resourceConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 1defc9f..c407941 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -266,18 +266,10 @@ public class ClusterStateVerifier {
}
}
- Map<String, StateModelDefinition> stateModelDefs =
- accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- Map<StateModelDefId, StateModelDefinition> convertedDefs =
- new HashMap<StateModelDefId, StateModelDefinition>();
- for (String defName : stateModelDefs.keySet()) {
- convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
- }
ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
Cluster cluster = clusterAccessor.readCluster();
// calculate best possible state
- NewBestPossibleStateOutput bestPossOutput =
- ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+ NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
// set error states
if (errStates != null) {
@@ -443,11 +435,9 @@ public class ClusterStateVerifier {
* @throws Exception
*/
- static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
- Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
+ static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
event.addAttribute("ClusterDataCache", cluster);
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
NewResourceComputationStage rcState = new NewResourceComputationStage();
NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
new file mode 100644
index 0000000..380d99e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
@@ -0,0 +1,129 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.ResourceConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A user config is a namespaced subset in the physical model and a separate entity in the logical
+ * model. These tests ensure that that separation is honored.
+ */
+public class TestNamespacedConfig {
+ /**
+ * Ensure that user configs are separated from helix configs in properties that hold both
+ */
+ @Test
+ public void testUserConfigUpdates() {
+ final String testKey = "testKey";
+ final String prefixedKey = UserConfig.class.getSimpleName() + "_testKey";
+ final String testSimpleValue = "testValue";
+ final List<String> testListValue = ImmutableList.of("testValue");
+ final Map<String, String> testMapValue = ImmutableMap.of("testInnerKey", "testValue");
+
+ // first, add Helix configuration to an InstanceConfig
+ ParticipantId participantId = Id.participant("testParticipant");
+ InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.setHostName("localhost");
+
+ // now, add user configuration
+ UserConfig userConfig = new UserConfig(participantId);
+ userConfig.setSimpleField(testKey, testSimpleValue);
+ userConfig.setListField(testKey, testListValue);
+ userConfig.setMapField(testKey, testMapValue);
+
+ // add the user configuration to the Helix configuration
+ instanceConfig.addNamespacedConfig(userConfig);
+
+ // get the user configuration back from the property
+ UserConfig retrievedConfig = UserConfig.from(instanceConfig);
+
+ // check that the property still has the host name
+ Assert.assertTrue(instanceConfig.getHostName().equals("localhost"));
+
+ // check that the retrieved config does not contain the host name
+ Assert.assertEquals(retrievedConfig.getStringField(
+ InstanceConfigProperty.HELIX_HOST.toString(), "not localhost"), "not localhost");
+
+ // check that both the retrieved config and the original config have the added properties
+ Assert.assertEquals(userConfig.getSimpleField(testKey), testSimpleValue);
+ Assert.assertEquals(userConfig.getListField(testKey), testListValue);
+ Assert.assertEquals(userConfig.getMapField(testKey), testMapValue);
+ Assert.assertEquals(retrievedConfig.getSimpleField(testKey), testSimpleValue);
+ Assert.assertEquals(retrievedConfig.getListField(testKey), testListValue);
+ Assert.assertEquals(retrievedConfig.getMapField(testKey), testMapValue);
+
+ // test that the property has the user config, but prefixed
+ Assert.assertEquals(instanceConfig.getRecord().getSimpleField(prefixedKey), testSimpleValue);
+ Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
+ Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
+ }
+
+ @Test
+ public void testConfiguredResource() {
+ // Set up the namespaced configs
+ String userKey = "userKey";
+ String userValue = "userValue";
+ ResourceId resourceId = Id.resource("testResource");
+ UserConfig userConfig = new UserConfig(resourceId);
+ userConfig.setSimpleField(userKey, userValue);
+ PartitionId partitionId = Id.partition(resourceId, "0");
+ Partition partition = new Partition(partitionId);
+ Map<ParticipantId, State> preferenceMap = new HashMap<ParticipantId, State>();
+ ParticipantId participantId = Id.participant("participant");
+ preferenceMap.put(participantId, State.from("ONLINE"));
+ CustomRebalancerConfig rebalancerConfig =
+ new CustomRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition)
+ .stateModelDef(Id.stateModelDef("OnlineOffline"))
+ .preferenceMap(partitionId, preferenceMap).build();
+
+ // copy in the configs
+ ResourceConfiguration config = new ResourceConfiguration(resourceId);
+ config.addNamespacedConfig(userConfig);
+ config.addRebalancerConfig(rebalancerConfig);
+
+ // recreate the configs and check the fields
+ UserConfig retrievedUserConfig = UserConfig.from(config);
+ Assert.assertEquals(retrievedUserConfig.getSimpleField(userKey), userValue);
+ Map<PartitionId, UserConfig> partitionConfigs = Collections.emptyMap();
+ RebalancerConfig retrievedRebalancerConfig = RebalancerConfig.from(config, partitionConfigs);
+ Assert.assertEquals(retrievedRebalancerConfig.getReplicaCount(),
+ rebalancerConfig.getReplicaCount());
+ Assert.assertEquals(retrievedRebalancerConfig.getStateModelDefId(),
+ rebalancerConfig.getStateModelDefId());
+ Assert.assertTrue(retrievedRebalancerConfig.getPartitionMap().containsKey(partitionId));
+ Assert.assertEquals(retrievedRebalancerConfig.getPartitionSet().size(), rebalancerConfig
+ .getPartitionSet().size());
+ CustomRebalancerConfig customConfig = CustomRebalancerConfig.from(retrievedRebalancerConfig);
+ Assert.assertEquals(customConfig.getPreferenceMap(partitionId).get(participantId),
+ State.from("ONLINE"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 0561814..643875c 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
*/
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,14 +27,12 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.controller.ClusterController;
@@ -43,10 +40,8 @@ import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -117,7 +112,7 @@ public class TestNewStages extends ZkUnitTestBase {
ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
Cluster cluster = clusterAccessor.readCluster();
ClusterEvent event = new ClusterEvent(testName);
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
+ event.addAttribute(AttributeName.CURRENT_STATE.toString(), new ResourceCurrentState());
Map<ResourceId, ResourceConfig> resourceConfigMap =
Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
@Override
@@ -127,11 +122,6 @@ public class TestNewStages extends ZkUnitTestBase {
});
event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
event.addAttribute("ClusterDataCache", cluster);
- Map<StateModelDefId, StateModelDefinition> stateModelMap =
- new HashMap<StateModelDefId, StateModelDefinition>();
- stateModelMap.put(Id.stateModelDef("MasterSlave"), new StateModelDefinition(
- StateModelConfigGenerator.generateConfigForMasterSlave()));
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelMap);
// Run the stage
try {
@@ -166,57 +156,25 @@ public class TestNewStages extends ZkUnitTestBase {
ResourceId resourceId = new ResourceId("TestDB0");
Resource resource = cluster.getResource(resourceId);
- StateModelDefinition masterSlave =
- new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
- NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
- ResourceAssignment fullAutoResult =
- new NewAutoRebalancer().computeResourceMapping(resource.getConfig(), cluster, masterSlave,
- currentStateOutput);
- verifyFullAutoRebalance(resource, fullAutoResult);
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
+ SemiAutoRebalancerConfig semiAutoConfig =
+ SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
ResourceAssignment semiAutoResult =
- new NewSemiAutoRebalancer().computeResourceMapping(resource.getConfig(), cluster,
- masterSlave,
+ new NewSemiAutoRebalancer().computeResourceMapping(semiAutoConfig, cluster,
currentStateOutput);
verifySemiAutoRebalance(resource, semiAutoResult);
- ResourceAssignment customResult =
- new NewCustomRebalancer().computeResourceMapping(resource.getConfig(), cluster,
- masterSlave,
- currentStateOutput);
- verifyCustomRebalance(resource, customResult);
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
/**
- * Check that a full auto rebalance is run, and at least one replica per partition is mapped
- * @param resource the resource to verify
- * @param assignment the assignment to verify
- */
- private void verifyFullAutoRebalance(Resource resource, ResourceAssignment assignment) {
- Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
- for (PartitionId partitionId : assignment.getMappedPartitions()) {
- Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
- Assert.assertTrue(replicaMap.size() <= r);
- Assert.assertTrue(replicaMap.size() > 0);
- boolean hasMaster = false;
- for (State state : replicaMap.values()) {
- if (state.equals(State.from("MASTER"))) {
- Assert.assertFalse(hasMaster);
- hasMaster = true;
- }
- }
- Assert.assertTrue(hasMaster);
- }
- }
-
- /**
* Check that a semi auto rebalance is run, and all partitions are mapped by preference list
* @param resource the resource to verify
* @param assignment the assignment to verify
*/
private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
- RebalancerConfig config = resource.getRebalancerConfig();
+ SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
for (PartitionId partitionId : assignment.getMappedPartitions()) {
List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
@@ -235,26 +193,6 @@ public class TestNewStages extends ZkUnitTestBase {
}
}
- /**
- * For vanilla customized rebalancing, the resource assignment should match the preference map
- * @param resource the resource to verify
- * @param assignment the assignment to verify
- */
- private void verifyCustomRebalance(Resource resource, ResourceAssignment assignment) {
- Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
- RebalancerConfig config = resource.getRebalancerConfig();
- for (PartitionId partitionId : assignment.getMappedPartitions()) {
- Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
- Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
- Assert.assertEquals(replicaMap.size(), preferenceMap.size());
- Assert.assertEquals(replicaMap.size(), r);
- for (ParticipantId participant : preferenceMap.keySet()) {
- Assert.assertTrue(replicaMap.containsKey(participant));
- Assert.assertEquals(replicaMap.get(participant), preferenceMap.get(participant));
- }
- }
- }
-
@BeforeClass
public void beforeClass() throws Exception {
// set up a running class
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
deleted file mode 100644
index 36cbf61..0000000
--- a/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.helix.api;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A user config is a namespaced subset in the physical model and a separate entity in the logical
- * model. These tests ensure that that separation is honored.
- */
-public class TestUserConfig {
- /**
- * Ensure that user configs are separated from helix configs in properties that hold both
- */
- @Test
- public void testUserConfigUpdates() {
- final String testKey = "testKey";
- final String prefixedKey = UserConfig.class.getSimpleName() + "_testKey";
- final String testSimpleValue = "testValue";
- final List<String> testListValue = ImmutableList.of("testValue");
- final Map<String, String> testMapValue = ImmutableMap.of("testInnerKey", "testValue");
-
- // first, add Helix configuration to an InstanceConfig
- ParticipantId participantId = Id.participant("testParticipant");
- InstanceConfig instanceConfig = new InstanceConfig(participantId);
- instanceConfig.setHostName("localhost");
-
- // now, add user configuration
- UserConfig userConfig = new UserConfig(participantId);
- userConfig.setSimpleField(testKey, testSimpleValue);
- userConfig.setListField(testKey, testListValue);
- userConfig.setMapField(testKey, testMapValue);
-
- // add the user configuration to the Helix configuration
- instanceConfig.addUserConfig(userConfig);
-
- // get the user configuration back from the property
- UserConfig retrievedConfig = UserConfig.from(instanceConfig);
-
- // check that the property still has the host name
- Assert.assertTrue(instanceConfig.getHostName().equals("localhost"));
-
- // check that the retrieved config does not contain the host name
- Assert.assertEquals(retrievedConfig.getStringField(
- InstanceConfigProperty.HELIX_HOST.toString(), "not localhost"), "not localhost");
-
- // check that both the retrieved config and the original config have the added properties
- Assert.assertEquals(userConfig.getSimpleField(testKey), testSimpleValue);
- Assert.assertEquals(userConfig.getListField(testKey), testListValue);
- Assert.assertEquals(userConfig.getMapField(testKey), testMapValue);
- Assert.assertEquals(retrievedConfig.getSimpleField(testKey), testSimpleValue);
- Assert.assertEquals(retrievedConfig.getListField(testKey), testListValue);
- Assert.assertEquals(retrievedConfig.getMapField(testKey), testMapValue);
-
- // test that the property has the user config, but prefixed
- Assert.assertEquals(instanceConfig.getRecord().getSimpleField(prefixedKey), testSimpleValue);
- Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
- Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index f8494bd..b680844 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -34,10 +35,11 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.UserConfig;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.model.IdealState;
@@ -170,10 +172,11 @@ public class BaseStageTest {
for (PartitionId partitionId : idealState.getPartitionSet()) {
partitionMap.put(partitionId, new Partition(partitionId));
}
- RebalancerConfig rebalancerConfig = new RebalancerConfig(partitionMap, idealState, null, 0);
- ResourceConfig resourceConfig =
- new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
- resourceMap.put(resourceId, resourceConfig);
+ Map<PartitionId, UserConfig> partitionConfigMap = Collections.emptyMap();
+ Resource resource =
+ new Resource(resourceId, idealState, null, null, new UserConfig(resourceId),
+ partitionConfigMap);
+ resourceMap.put(resourceId, resource.getConfig());
}
return resourceMap;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 7dd2d45..8873a61 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -31,10 +31,8 @@ import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -56,13 +54,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
List<IdealState> idealStates =
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
setupLiveInstances(5);
- Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+ setupStateModel();
Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
- NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
@@ -95,13 +92,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
List<IdealState> idealStates =
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
setupLiveInstances(5);
- Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+ setupStateModel();
Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
- NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 234c441..9f7ea82 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -28,10 +28,8 @@ import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -47,13 +45,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
};
List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
setupLiveInstances(5);
- Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+ setupStateModel();
Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
- NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 489537f..837eee7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -48,7 +48,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ ResourceCurrentState output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
output.getCurrentStateMap(Id.resource("testResourceName"),
Id.partition("testResourceName_0")).size(), 0);
@@ -69,7 +69,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ ResourceCurrentState output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
output1.getCurrentStateMap(Id.resource("testResourceName"),
Id.partition("testResourceName_0")).size(), 0);
@@ -88,7 +88,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ ResourceCurrentState output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
State pendingState =
output2.getPendingState(Id.resource("testResourceName"),
Id.partition("testResourceName_1"), Id.participant("localhost_3"));
@@ -113,7 +113,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
stateWithDeadSession);
runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ ResourceCurrentState output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
State currentState =
output3.getCurrentState(Id.resource("testResourceName"),
Id.partition("testResourceName_1"), Id.participant("localhost_3"));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index d8e7955..1b0a0b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -30,12 +30,11 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
@@ -55,20 +54,22 @@ public class TestCustomizedIdealStateRebalancer extends
String db2 = TEST_DB + "2";
static boolean testRebalancerInvoked = false;
- public static class TestRebalancer implements NewRebalancer {
+ public static class TestRebalancer implements NewUserDefinedRebalancer {
/**
* Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
* which is in the highest-priority state.
*/
@Override
- public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
- StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+ public ResourceAssignment computeResourceMapping(UserDefinedRebalancerConfig config,
+ Cluster cluster, ResourceCurrentState currentState) {
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
List<ParticipantId> liveParticipants =
new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
- ResourceAssignment resourceMapping = new ResourceAssignment(resourceConfig.getId());
+ ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
int i = 0;
- for (PartitionId partitionId : resourceConfig.getPartitionSet()) {
+ for (PartitionId partitionId : config.getPartitionSet()) {
int nodeIndex = i % liveParticipants.size();
Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 642a8a3..e08394c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -8,10 +8,10 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.ClusterAccessor;
import org.apache.helix.api.ClusterConfig;
import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantConfig;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
@@ -22,7 +22,6 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -90,9 +89,9 @@ public class NewModelExample {
private static ResourceConfig getResource(StateModelDefinition stateModelDef) {
ResourceId resourceId = Id.resource("exampleResource");
- RebalancerConfig.Builder rebalanceConfigBuilder =
- new RebalancerConfig.Builder(resourceId).rebalancerMode(RebalanceMode.FULL_AUTO)
- .replicaCount(3).addPartitions(5).stateModelDef(stateModelDef.getStateModelDefId());
+ FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
+ new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(3).addPartitions(5)
+ .stateModelDef(stateModelDef.getStateModelDefId());
ResourceConfig.Builder resourceBuilder =
new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build());
return resourceBuilder.build();