You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/04 23:25:06 UTC
git commit: [HELIX-492] Task should have its own rebalance mode
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 47199d507 -> bb15ca8f0
[HELIX-492] Task should have its own rebalance mode
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bb15ca8f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bb15ca8f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bb15ca8f
Branch: refs/heads/helix-0.6.x
Commit: bb15ca8f06aa1c4d98bacc8547535a797cce0328
Parents: 47199d5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Aug 1 13:00:26 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 4 14:24:45 2014 -0700
----------------------------------------------------------------------
.../stages/BestPossibleStateCalcStage.java | 1 +
.../apache/helix/manager/zk/ZKHelixAdmin.java | 2 ++
.../java/org/apache/helix/model/IdealState.java | 27 +++++++++++++++-----
.../java/org/apache/helix/task/TaskDriver.java | 9 ++++---
4 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 706fd41..9a9767e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -123,6 +123,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
mappingCalculator = customRebalancer;
break;
case USER_DEFINED:
+ case TASK:
String rebalancerClassName = idealState.getRebalancerClassName();
logger
.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index aa2d617..89fdab7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -253,6 +253,8 @@ public class ZKHelixAdmin implements HelixAdmin {
.getPreferenceList(partitionName) == null)
|| (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED && idealState
.getPreferenceList(partitionName) == null)
+ || (idealState.getRebalanceMode() == RebalanceMode.TASK && idealState
+ .getPreferenceList(partitionName) == null)
|| (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
.getInstanceStateMap(partitionName) == null)) {
logger.warn("Cluster: " + clusterName + ", resource: " + resourceName + ", partition: "
http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index a209cd9..bc31e1e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -31,6 +31,9 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.task.FixedTargetTaskRebalancer;
+import org.apache.helix.task.GenericTaskRebalancer;
+import org.apache.helix.task.TaskRebalancer;
import org.apache.log4j.Logger;
/**
@@ -71,13 +74,15 @@ public class IdealState extends HelixProperty {
/**
* The mode used for rebalance. FULL_AUTO does both node location calculation and state
* assignment, SEMI_AUTO only does the latter, and CUSTOMIZED does neither. USER_DEFINED
- * uses a Rebalancer implementation plugged in by the user.
+ * uses a Rebalancer implementation plugged in by the user. TASK designates that a
+ * {@link TaskRebalancer} instance should be used to rebalance this resource.
*/
public enum RebalanceMode {
FULL_AUTO,
SEMI_AUTO,
CUSTOMIZED,
USER_DEFINED,
+ TASK,
NONE
}
@@ -207,10 +212,11 @@ public class IdealState extends HelixProperty {
*/
public Set<String> getPartitionSet() {
if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
- || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ || getRebalanceMode() == RebalanceMode.FULL_AUTO
+ || getRebalanceMode() == RebalanceMode.USER_DEFINED
+ || getRebalanceMode() == RebalanceMode.TASK) {
return _record.getListFields().keySet();
- } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
- || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
return _record.getMapFields().keySet();
} else {
logger.error("Invalid ideal state mode:" + getResourceName());
@@ -235,7 +241,8 @@ public class IdealState extends HelixProperty {
public Set<String> getInstanceSet(String partitionName) {
if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
|| getRebalanceMode() == RebalanceMode.FULL_AUTO
- || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ || getRebalanceMode() == RebalanceMode.USER_DEFINED
+ || getRebalanceMode() == RebalanceMode.TASK) {
List<String> prefList = _record.getListField(partitionName);
if (prefList != null) {
return new TreeSet<String>(prefList);
@@ -457,8 +464,14 @@ public class IdealState extends HelixProperty {
property = RebalanceMode.CUSTOMIZED;
break;
default:
- if (getRebalancerClassName() != null) {
- property = RebalanceMode.USER_DEFINED;
+ String rebalancerName = getRebalancerClassName();
+ if (rebalancerName != null) {
+ if (rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())
+ || rebalancerName.equals(GenericTaskRebalancer.class.getName())) {
+ property = RebalanceMode.TASK;
+ } else {
+ property = RebalanceMode.USER_DEFINED;
+ }
} else {
property = RebalanceMode.SEMI_AUTO;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index e4871b5..ab504cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -20,8 +20,10 @@ package org.apache.helix.task;
*/
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -190,14 +192,15 @@ public class TaskDriver {
// Push out new ideal state based on number of target partitions
CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
- builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
+ builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
builder.setNumReplica(1);
builder.setNumPartitions(numPartitions);
builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+ IdealState is = builder.build();
for (int i = 0; i < numPartitions; i++) {
- builder.add(jobResource + "_" + i);
+ is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
+ is.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
}
- IdealState is = builder.build();
if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
} else {