You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/04 23:25:06 UTC

git commit: [HELIX-492] Task should have its own rebalance mode

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 47199d507 -> bb15ca8f0


[HELIX-492] Task should have its own rebalance mode


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bb15ca8f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bb15ca8f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bb15ca8f

Branch: refs/heads/helix-0.6.x
Commit: bb15ca8f06aa1c4d98bacc8547535a797cce0328
Parents: 47199d5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Aug 1 13:00:26 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 4 14:24:45 2014 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  1 +
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  2 ++
 .../java/org/apache/helix/model/IdealState.java | 27 +++++++++++++++-----
 .../java/org/apache/helix/task/TaskDriver.java  |  9 ++++---
 4 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 706fd41..9a9767e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -123,6 +123,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         mappingCalculator = customRebalancer;
         break;
       case USER_DEFINED:
+      case TASK:
         String rebalancerClassName = idealState.getRebalancerClassName();
         logger
             .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);

http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index aa2d617..89fdab7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -253,6 +253,8 @@ public class ZKHelixAdmin implements HelixAdmin {
             .getPreferenceList(partitionName) == null)
             || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED && idealState
                 .getPreferenceList(partitionName) == null)
+            || (idealState.getRebalanceMode() == RebalanceMode.TASK && idealState
+                .getPreferenceList(partitionName) == null)
             || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
                 .getInstanceStateMap(partitionName) == null)) {
           logger.warn("Cluster: " + clusterName + ", resource: " + resourceName + ", partition: "

http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index a209cd9..bc31e1e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -31,6 +31,9 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.task.FixedTargetTaskRebalancer;
+import org.apache.helix.task.GenericTaskRebalancer;
+import org.apache.helix.task.TaskRebalancer;
 import org.apache.log4j.Logger;
 
 /**
@@ -71,13 +74,15 @@ public class IdealState extends HelixProperty {
   /**
    * The mode used for rebalance. FULL_AUTO does both node location calculation and state
    * assignment, SEMI_AUTO only does the latter, and CUSTOMIZED does neither. USER_DEFINED
-   * uses a Rebalancer implementation plugged in by the user.
+   * uses a Rebalancer implementation plugged in by the user. TASK designates that a
+   * {@link TaskRebalancer} instance should be used to rebalance this resource.
    */
   public enum RebalanceMode {
     FULL_AUTO,
     SEMI_AUTO,
     CUSTOMIZED,
     USER_DEFINED,
+    TASK,
     NONE
   }
 
@@ -207,10 +212,11 @@ public class IdealState extends HelixProperty {
    */
   public Set<String> getPartitionSet() {
     if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
-        || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+        || getRebalanceMode() == RebalanceMode.FULL_AUTO
+        || getRebalanceMode() == RebalanceMode.USER_DEFINED
+        || getRebalanceMode() == RebalanceMode.TASK) {
       return _record.getListFields().keySet();
-    } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
-        || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+    } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
       return _record.getMapFields().keySet();
     } else {
       logger.error("Invalid ideal state mode:" + getResourceName());
@@ -235,7 +241,8 @@ public class IdealState extends HelixProperty {
   public Set<String> getInstanceSet(String partitionName) {
     if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
         || getRebalanceMode() == RebalanceMode.FULL_AUTO
-        || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        || getRebalanceMode() == RebalanceMode.USER_DEFINED
+        || getRebalanceMode() == RebalanceMode.TASK) {
       List<String> prefList = _record.getListField(partitionName);
       if (prefList != null) {
         return new TreeSet<String>(prefList);
@@ -457,8 +464,14 @@ public class IdealState extends HelixProperty {
       property = RebalanceMode.CUSTOMIZED;
       break;
     default:
-      if (getRebalancerClassName() != null) {
-        property = RebalanceMode.USER_DEFINED;
+      String rebalancerName = getRebalancerClassName();
+      if (rebalancerName != null) {
+        if (rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())
+            || rebalancerName.equals(GenericTaskRebalancer.class.getName())) {
+          property = RebalanceMode.TASK;
+        } else {
+          property = RebalanceMode.USER_DEFINED;
+        }
       } else {
         property = RebalanceMode.SEMI_AUTO;
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/bb15ca8f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index e4871b5..ab504cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -20,8 +20,10 @@ package org.apache.helix.task;
  */
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -190,14 +192,15 @@ public class TaskDriver {
 
     // Push out new ideal state based on number of target partitions
     CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
-    builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
+    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
     builder.setNumReplica(1);
     builder.setNumPartitions(numPartitions);
     builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+    IdealState is = builder.build();
     for (int i = 0; i < numPartitions; i++) {
-      builder.add(jobResource + "_" + i);
+      is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
+      is.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
     }
-    IdealState is = builder.build();
     if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
       is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
     } else {