You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/22 23:11:06 UTC

[1/3] helix git commit: [HELIX-635] GenericTaskAssignmentCalculator rebalance with consistent hashing

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 7147ec874 -> 9fc6c540b


[HELIX-635] GenericTaskAssignmentCalculator rebalance with consistent hashing

1. Implement consistent hashing mapping calculation
2. Remove reassign logics and applied in consistent hashing
3. Add tests for GenericTaskAssignmentCalculator


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0a18726f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0a18726f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0a18726f

Branch: refs/heads/helix-0.6.x
Commit: 0a18726fcad7b8a0fe5e77d7a2c9848b86461ccc
Parents: 7147ec8
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 15:28:57 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Sep 13 15:59:24 2016 -0700

----------------------------------------------------------------------
 .../strategy/CrushRebalanceStrategy.java        |   2 +-
 .../crushMapping/CRUSHPlacementAlgorithm.java   |   1 +
 .../strategy/crushMapping/JenkinsHash.java      | 140 -----------
 .../task/GenericTaskAssignmentCalculator.java   | 238 +++++++------------
 .../java/org/apache/helix/util/JenkinsHash.java | 140 +++++++++++
 .../TestGenericTaskAssignmentCalculator.java    | 171 +++++++++++++
 .../task/TestIndependentTaskRebalancer.java     |   8 +-
 7 files changed, 405 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index a8fe107..b91d26c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -24,7 +24,7 @@ import com.google.common.base.Predicates;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
-import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.util.JenkinsHash;
 import org.apache.helix.controller.rebalancer.topology.Node;
 import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.stages.ClusterDataCache;

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
index 870656c..b7c1c68 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.util.JenkinsHash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
deleted file mode 100644
index 66566f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2013 Twitter, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.controller.rebalancer.strategy.crushMapping;
-
-public class JenkinsHash {
-  // max value to limit it to 4 bytes
-  private static final long MAX_VALUE = 0xFFFFFFFFL;
-  private static final long CRUSH_HASH_SEED = 1315423911L;
-
-  /**
-   * Convert a byte into a long value without making it negative.
-   */
-  private static long byteToLong(byte b) {
-    long val = b & 0x7F;
-    if ((b & 0x80) != 0) {
-      val += 128;
-    }
-    return val;
-  }
-
-  /**
-   * Do addition and turn into 4 bytes.
-   */
-  private static long add(long val, long add) {
-    return (val + add) & MAX_VALUE;
-  }
-
-  /**
-   * Do subtraction and turn into 4 bytes.
-   */
-  private static long subtract(long val, long subtract) {
-    return (val - subtract) & MAX_VALUE;
-  }
-
-  /**
-   * Left shift val by shift bits and turn in 4 bytes.
-   */
-  private static long xor(long val, long xor) {
-    return (val ^ xor) & MAX_VALUE;
-  }
-
-  /**
-   * Left shift val by shift bits.  Cut down to 4 bytes.
-   */
-  private static long leftShift(long val, int shift) {
-    return (val << shift) & MAX_VALUE;
-  }
-
-  /**
-   * Convert 4 bytes from the buffer at offset into a long value.
-   */
-  private static long fourByteToLong(byte[] bytes, int offset) {
-    return (byteToLong(bytes[offset + 0])
-        + (byteToLong(bytes[offset + 1]) << 8)
-        + (byteToLong(bytes[offset + 2]) << 16)
-        + (byteToLong(bytes[offset + 3]) << 24));
-  }
-
-  /**
-   * Mix up the values in the hash function.
-   */
-  private static Triple hashMix(Triple t) {
-    long a = t.a; long b = t.b; long c = t.c;
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
-    return new Triple(a, b, c);
-  }
-
-  private static class Triple {
-    long a;
-    long b;
-    long c;
-
-    public Triple(long a, long b, long c) {
-      this.a = a; this.b = b; this.c = c;
-    }
-  }
-
-  public long hash(long a) {
-    long hash = xor(CRUSH_HASH_SEED, a);
-    long b = a;
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(b, x, hash));
-    b = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, a, hash));
-    hash = val.c;
-    return hash;
-  }
-
-  public long hash(long a, long b) {
-    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(a, b, hash));
-    a = val.a; b = val.b; hash = val.c;
-    val = hashMix(new Triple(x, a, hash));
-    x = val.a; a = val.b; hash = val.c;
-    val = hashMix(new Triple(b, y, hash));
-    hash = val.c;
-    return hash;
-  }
-
-  public long hash(long a, long b, long c) {
-    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(a, b, hash));
-    a = val.a; b = val.b; hash = val.c;
-    val = hashMix(new Triple(c, x, hash));
-    c = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, a, hash));
-    y = val.a; a = val.b; hash = val.c;
-    val = hashMix(new Triple(b, x, hash));
-    b = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, c, hash));
-    hash = val.c;
-    return hash;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index ac96768..fbc7af3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -19,30 +19,26 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixException;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.JenkinsHash;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -54,9 +50,6 @@ import com.google.common.collect.Sets;
 public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   private static final Logger LOG = Logger.getLogger(GenericTaskAssignmentCalculator.class);
 
-  /** Reassignment policy for this algorithm */
-  private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
-
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
@@ -96,14 +89,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     // Transform from partition id to fully qualified partition name
     List<Integer> partitionNums = Lists.newArrayList(partitionSet);
     Collections.sort(partitionNums);
-    final String resourceId = prevAssignment.getResourceName();
-    List<String> partitions =
-        new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
-          @Override
-          public String apply(Integer partitionNum) {
-            return resourceId + "_" + partitionNum;
-          }
-        }));
+    String resourceId = prevAssignment.getResourceName();
 
     // Compute the current assignment
     Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
@@ -122,156 +108,108 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     }
 
     // Get the assignment keyed on partition
-    RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
-    List<String> allNodes =
-        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
-    Collections.sort(allNodes);
-    ZNRecord record =
-        strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
-    Map<String, List<String>> preferenceLists = record.getListFields();
-
-    // Convert to an assignment keyed on participant
-    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
-    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
-      String partitionName = e.getKey();
-      partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName));
-      List<String> preferenceList = e.getValue();
-      for (String participantName : preferenceList) {
-        if (!taskAssignment.containsKey(participantName)) {
-          taskAssignment.put(participantName, new TreeSet<Integer>());
-        }
-        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
-      }
+    if (jobCfg.getTargetResource() != null) {
+      LOG.error(
+          "Target resource is not null, should call FixedTaskAssignmentCalculator, target resource : "
+              + jobCfg.getTargetResource());
+      return new HashMap<String, SortedSet<Integer>>();
     }
 
-    // Finally, adjust the assignment if tasks have been failing
-    taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
+    List<String> allNodes = Lists.newArrayList(instances);
+    ConsistentHashingPlacement placement = new ConsistentHashingPlacement(allNodes);
+    Map<String, SortedSet<Integer>> taskAssignment =
+        placement.computeMapping(jobCfg, jobContext, partitionNums, resourceId);
+
     return taskAssignment;
   }
 
-  /**
-   * Filter a list of instances based on targeted resource policies
-   * @param jobCfg the job configuration
-   * @param currStateOutput the current state of all instances in the cluster
-   * @param instances valid instances
-   * @param cache current snapshot of the cluster
-   * @return a set of instances that can be assigned to
-   */
-  private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
-      Iterable<String> instances, ClusterDataCache cache) {
-    // No target resource means any instance is available
-    Set<String> allInstances = Sets.newHashSet(instances);
-    String targetResource = jobCfg.getTargetResource();
-    if (targetResource == null) {
-      return allInstances;
-    }
+  private class ConsistentHashingPlacement {
+    private JenkinsHash _hashFunction;
+    private ConsistentHashSelector _selector;
+    private int _numInstances;
 
-    // Bad ideal state means don't assign
-    IdealState idealState = cache.getIdealState(targetResource);
-    if (idealState == null) {
-      return Collections.emptySet();
+    public ConsistentHashingPlacement(List<String> potentialInstances) {
+      _hashFunction = new JenkinsHash();
+      _selector = new ConsistentHashSelector(potentialInstances);
+      _numInstances = potentialInstances.size();
     }
 
-    // Get the partitions on the target resource to use
-    Set<String> partitions = idealState.getPartitionSet();
-    List<String> targetPartitions = jobCfg.getTargetPartitions();
-    if (targetPartitions != null && !targetPartitions.isEmpty()) {
-      partitions.retainAll(targetPartitions);
-    }
+    public Map<String, SortedSet<Integer>> computeMapping(JobConfig jobConfig,
+        JobContext jobContext, List<Integer> partitions, String resourceId) {
+      if (_numInstances == 0) {
+        return new HashMap<String, SortedSet<Integer>>();
+      }
+
+      Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+
+      for (int partition : partitions) {
+        long hashedValue = new String(resourceId + "_" + partition).hashCode();
+        int shiftTimes;
+        int numAttempts = jobContext.getPartitionNumAttempts(partition);
+        int maxAttempts = jobConfig.getMaxAttemptsPerTask();
 
-    // Based on state matches, add eligible instances
-    Set<String> eligibleInstances = Sets.newHashSet();
-    Set<String> targetStates = jobCfg.getTargetPartitionStates();
-    for (String partition : partitions) {
-      Map<String, String> stateMap =
-          currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
-      Map<String, String> pendingStateMap =
-          currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
-      for (Map.Entry<String, String> e : stateMap.entrySet()) {
-        String instanceName = e.getKey();
-        String state = e.getValue();
-        String pending = pendingStateMap.get(instanceName);
-        if (pending != null) {
-          continue;
+        if (jobConfig.getMaxAttemptsPerTask() < _numInstances) {
+          shiftTimes = numAttempts == -1 ? 0 : numAttempts;
+        } else {
+          shiftTimes = (maxAttempts == 0)
+              ? 0
+              : jobContext.getPartitionNumAttempts(partition) / (maxAttempts / _numInstances);
+        }
+        // Hash the value based on the shifting time. The default shift time will be 0.
+        for (int i = 0; i <= shiftTimes; i++) {
+          hashedValue = _hashFunction.hash(hashedValue);
         }
-        if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
-          eligibleInstances.add(instanceName);
+        String selectedInstance = select(hashedValue);
+        if (selectedInstance != null) {
+          if (!taskAssignment.containsKey(selectedInstance)) {
+            taskAssignment.put(selectedInstance, new TreeSet<Integer>());
+          }
+          taskAssignment.get(selectedInstance).add(partition);
         }
       }
+      return taskAssignment;
     }
-    allInstances.retainAll(eligibleInstances);
-    return allInstances;
-  }
 
-  public interface RetryPolicy {
-    /**
-     * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
-     * assigned
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can serve tasks
-     * @param origAssignment the unmodified assignment
-     * @return the adjusted assignment
-     */
-    Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
-  }
+    private String select(long data) throws HelixException {
+      return _selector.get(data);
+    }
+
+    private class ConsistentHashSelector {
+      private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000;
+      private final SortedMap<Long, String> circle = new TreeMap<Long, String>();
+      protected int instanceSize = 0;
 
-  private static class DefaultRetryReassigner implements RetryPolicy {
-    @Override
-    public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
-      // Compute an increasing integer ID for each instance
-      BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
-      int instanceIndex = 0;
-      for (String instance : instances) {
-        instanceMap.put(instance, instanceIndex++);
+      public ConsistentHashSelector(List<String> instances) {
+        for (String instance : instances) {
+          long tokenCount = DEFAULT_TOKENS_PER_INSTANCE;
+          add(instance, tokenCount);
+          instanceSize++;
+        }
       }
 
-      // Move partitions
-      Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
-      for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
-        String instance = e.getKey();
-        SortedSet<Integer> partitions = e.getValue();
-        Integer instanceId = instanceMap.get(instance);
-        if (instanceId != null) {
-          for (int p : partitions) {
-            // Determine for each partition if there have been failures with the current assignment
-            // strategy, and if so, force a shift in assignment for that partition only
-            int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
-            int newInstanceId = (instanceId + shiftValue) % instances.size();
-            String newInstance = instanceMap.inverse().get(newInstanceId);
-            if (newInstance == null) {
-              newInstance = instance;
-            }
-            if (!newAssignment.containsKey(newInstance)) {
-              newAssignment.put(newInstance, new TreeSet<Integer>());
-            }
-            newAssignment.get(newInstance).add(p);
-          }
-        } else {
-          // In case something goes wrong, just keep the previous assignment
-          newAssignment.put(instance, partitions);
+      public void add(String instance, long numberOfReplicas) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+          circle.put(_hashFunction.hash(instance.hashCode(), i), instance);
+        }
+      }
+
+      public void remove(String instance, long numberOfReplicas) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+          circle.remove(_hashFunction.hash(instance.hashCode(), i));
         }
       }
-      return newAssignment;
-    }
 
-    /**
-     * In case tasks fail, we may not want to schedule them in the same place. This method allows us
-     * to compute a shifting value so that we can systematically choose other instances to try
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can be chosen
-     * @param p the partition to look up
-     * @return the shifting value
-     */
-    private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, int p) {
-      int numAttempts = jobCtx.getPartitionNumAttempts(p);
-      int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
-      int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
-      return numAttempts / (maxNumAttempts / numInstances);
+      public String get(long data) {
+        if (circle.isEmpty()) {
+          return null;
+        }
+        long hash = _hashFunction.hash(data);
+        if (!circle.containsKey(hash)) {
+          SortedMap<Long, String> tailMap = circle.tailMap(hash);
+          hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+        }
+        return circle.get(hash);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
new file mode 100644
index 0000000..3ccd1f4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.util;
+
+public class JenkinsHash {
+  // max value to limit it to 4 bytes
+  private static final long MAX_VALUE = 0xFFFFFFFFL;
+  private static final long CRUSH_HASH_SEED = 1315423911L;
+
+  /**
+   * Convert a byte into a long value without making it negative.
+   */
+  private static long byteToLong(byte b) {
+    long val = b & 0x7F;
+    if ((b & 0x80) != 0) {
+      val += 128;
+    }
+    return val;
+  }
+
+  /**
+   * Do addition and turn into 4 bytes.
+   */
+  private static long add(long val, long add) {
+    return (val + add) & MAX_VALUE;
+  }
+
+  /**
+   * Do subtraction and turn into 4 bytes.
+   */
+  private static long subtract(long val, long subtract) {
+    return (val - subtract) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits and turn in 4 bytes.
+   */
+  private static long xor(long val, long xor) {
+    return (val ^ xor) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits.  Cut down to 4 bytes.
+   */
+  private static long leftShift(long val, int shift) {
+    return (val << shift) & MAX_VALUE;
+  }
+
+  /**
+   * Convert 4 bytes from the buffer at offset into a long value.
+   */
+  private static long fourByteToLong(byte[] bytes, int offset) {
+    return (byteToLong(bytes[offset + 0])
+        + (byteToLong(bytes[offset + 1]) << 8)
+        + (byteToLong(bytes[offset + 2]) << 16)
+        + (byteToLong(bytes[offset + 3]) << 24));
+  }
+
+  /**
+   * Mix up the values in the hash function.
+   */
+  private static Triple hashMix(Triple t) {
+    long a = t.a; long b = t.b; long c = t.c;
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+    return new Triple(a, b, c);
+  }
+
+  private static class Triple {
+    long a;
+    long b;
+    long c;
+
+    public Triple(long a, long b, long c) {
+      this.a = a; this.b = b; this.c = c;
+    }
+  }
+
+  public long hash(long a) {
+    long hash = xor(CRUSH_HASH_SEED, a);
+    long b = a;
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b) {
+    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(x, a, hash));
+    x = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, y, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b, long c) {
+    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(c, x, hash));
+    c = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    y = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, c, hash));
+    hash = val.c;
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
new file mode 100644
index 0000000..0410db2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -0,0 +1,171 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
+  private Set<String> _invokedClasses = Sets.newHashSet();
+  private Map<String, Integer> _runCounts = Maps.newHashMap();
+  private TaskConfig _taskConfig;
+  private Map<String, String> _jobCommandMap;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+
+      taskFactoryReg.put("TaskOne", new TaskFactory() {
+        @Override public Task createNewTask(TaskCallbackContext context) {
+          return new TaskOne(context, instanceName);
+        }
+      });
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    Map<String, String> taskConfigMap = Maps.newHashMap();
+    _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false);
+    _jobCommandMap = Maps.newHashMap();
+  }
+
+  @Test
+  public void testMultipleJobAssignment() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    taskConfigs.add(_taskConfig);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(_jobCommandMap);
+
+    for (int i = 0; i < 25; i++) {
+      workflowBuilder.addJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    Assert.assertEquals(_runCounts.size(), 5);
+  }
+
+  @Test
+  public void testMultipleTaskAssignment() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(20);
+    for (int i = 0; i < 50; i++) {
+      Map<String, String> taskConfigMap = Maps.newHashMap();
+      taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false));
+    }
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap)
+            .addTaskConfigs(taskConfigs);
+    workflowBuilder.addJob("JOB", jobBuilder);
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    Assert.assertEquals(_runCounts.size(), 5);
+  }
+
+  private class TaskOne extends MockTask {
+    private final String _instanceName;
+
+    public TaskOne(TaskCallbackContext context, String instanceName) {
+      super(context);
+
+      // Initialize the count for this instance if not already done
+      if (!_runCounts.containsKey(instanceName)) {
+        _runCounts.put(instanceName, 0);
+      }
+      _instanceName = instanceName;
+    }
+
+    @Override
+    public TaskResult run() {
+      _invokedClasses.add(getClass().getName());
+      _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 49b4bf4..c4d588c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -215,12 +215,12 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
   }
 
   @Test public void testReassignment() throws Exception {
-    final int NUM_INSTANCES = 2;
+    final int NUM_INSTANCES = 5;
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    Map<String, String> taskConfigMap = Maps.newHashMap(
-        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + _startPort));
+    Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap
+        .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
@@ -242,7 +242,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
 
     // Ensure that this was tried on two different instances, the first of which exhausted the
     // attempts number, and the other passes on the first try
-    Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
+    Assert.assertEquals(_runCounts.size(), 2);
     Assert.assertTrue(
         _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
     Assert.assertTrue(_runCounts.values().contains(1));


[2/3] helix git commit: Refactor TaskAssignmentCalculator API

Posted by lx...@apache.org.
Refactor TaskAssignmentCalculator API

Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large and not all the contents inside are used.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb2a9db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb2a9db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb2a9db

Branch: refs/heads/helix-0.6.x
Commit: 7bb2a9db2396a00bb9a721634a2432240679c657
Parents: 0a18726
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 16:00:08 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Sep 13 16:00:08 2016 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 36 ++++++++++++++------
 .../FixedTargetTaskAssignmentCalculator.java    | 35 +++++++------------
 .../helix/task/FixedTargetTaskRebalancer.java   |  4 +--
 .../task/GenericTaskAssignmentCalculator.java   |  6 ++--
 .../helix/task/GenericTaskRebalancer.java       |  4 +--
 .../org/apache/helix/task/JobRebalancer.java    | 11 +++---
 .../helix/task/TaskAssignmentCalculator.java    | 10 +++---
 7 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dacf98d..c8ca941 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -453,21 +453,35 @@ public class ClusterDataCache {
   }
 
   /**
-   * Return all the nodes that are enabled and tagged same as the job.
-   * @param allInstances List of instances to filter with instance tag
-   * @param instanceTag The instance group tag
-   * @return A new set contains instance name and that are marked enabled and have same
-   *         tag with job. The original set will not be changed during the filtering
+   * Return all the live nodes that are enabled
+   * @return A new set contains live instance name and that are marked enabled
    */
-  public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances,
-      String instanceTag) {
+  public Set<String> getAllEnabledLiveInstances() {
+    return getAllEnabledInstances(null);
+  }
+
+  /**
+   * Return all the live nodes that are enabled and tagged same as the job.
+   * @param instanceTag The instance group tag, could be null, when no instance group specified
+   * @return A new set contains live instance name and that are marked enabled and have same
+   *         tag with job, only if instance tag input is not null.
+   */
+  public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
+    return getAllEnabledInstances(instanceTag);
+  }
+
+  private Set<String> getAllEnabledInstances(String instanceTag) {
     Set<String> enabledTagInstances = new HashSet<String>();
-    for (String instance : allInstances) {
+    for (String instance : _liveInstanceMap.keySet()) {
       InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
 
-      if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig
-          .containsTag(instanceTag)) {
-        enabledTagInstances.add(instance);
+      // Check instance is enabled
+      if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+        // Check whether it has instance group or not
+        // If it has instance group, check whether it belongs to that group or not
+        if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+          enabledTagInstances.add(instance);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 09db616..0768b51 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -48,36 +48,37 @@ import org.apache.log4j.Logger;
 public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
   private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
 
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
-    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), jobCfg, jobCtx);
   }
 
   @Override
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
-    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap);
     if (tgtIs == null) {
       LOG.warn("Missing target resource for the scheduled job!");
       return Collections.emptyMap();
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
     return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
-        jobContext, cache);
+        jobContext);
   }
 
   /**
    * Gets the ideal state of the target resource of this job
    * @param jobCfg job config containing target resource id
-   * @param cache snapshot of the cluster containing the task and target resource
+   * @param idealStateMap the map of resource name map to ideal state
    * @return target resource ideal state, or null
    */
-  private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+  private static IdealState getTgtIdealState(JobConfig jobCfg,
+      Map<String, IdealState> idealStateMap) {
     String tgtResourceId = jobCfg.getTargetResource();
-    return cache.getIdealState(tgtResourceId);
+    return idealStateMap.get(tgtResourceId);
   }
 
   /**
@@ -131,7 +132,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    */
   private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
       CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) {
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
     for (String instance : instances) {
       result.put(instance, new TreeSet<Integer>());
@@ -153,18 +154,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
             continue;
           }
 
-          InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
-
-          if (instanceConfig == null) {
-            LOG.error("Instance config not found for instance : " + instance);
-            continue;
-          }
-
-          if (!instanceConfig.getInstanceEnabled()) {
-            LOG.debug("Instance has been disabled, ignore instance : " + instance);
-            continue;
-          }
-
           String s =
               currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
                   instance);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 569fe03..1589c1a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -43,7 +43,7 @@ import org.apache.helix.model.ResourceAssignment;
   @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
     return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
   }
 
   @Override public Map<String, SortedSet<Integer>> getTaskAssignment(
@@ -53,6 +53,6 @@ import org.apache.helix.model.ResourceAssignment;
       ClusterDataCache cache) {
     return taskAssignmentCalculator
         .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache);
+            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index fbc7af3..58ba670 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.util.JenkinsHash;
@@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
 
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap) {
     Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
     Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
     for (TaskConfig taskCfg : taskMap.values()) {
@@ -69,7 +71,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
     // Gather input to the full auto rebalancing algorithm
     LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
     states.put("ONLINE", 1);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 6a005b9..1720fbb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
     return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
   }
 
   @Override
@@ -52,6 +52,6 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
       Set<Integer> partitionSet, ClusterDataCache cache) {
     return taskAssignmentCalculator
         .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache);
+            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 378ad95..cf7f5e6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -131,9 +131,8 @@ public class JobRebalancer extends TaskRebalancer {
     // Fetch the previous resource assignment from the property store. This is required because of
     // HELIX-230.
     Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
-        ? clusterData.getLiveInstances().keySet()
-        : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
-            jobCfg.getInstanceGroupTag());
+        ? clusterData.getAllEnabledLiveInstances()
+        : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job!");
@@ -222,8 +221,8 @@ public class JobRebalancer extends TaskRebalancer {
 
     // Process all the current assignments of tasks.
     TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
-    Set<Integer> allPartitions =
-        taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+    Set<Integer> allPartitions = taskAssignmentCal
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates());
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
@@ -424,7 +423,7 @@ public class JobRebalancer extends TaskRebalancer {
       // Get instance->[partition, ...] mappings for the target resource.
       Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
           .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
+              workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
       for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
         String instance = entry.getKey();
         if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index a3ed5ab..a6a9ed3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -2,6 +2,7 @@ package org.apache.helix.task;
 
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.Collection;
@@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator {
    * @param jobCtx the task context
    * @param workflowCfg the workflow configuration
    * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
+   * @param idealStateMap the map of resource name map to ideal state
    * @return set of partition numbers
    */
   public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap);
 
   /**
    * Compute an assignment of tasks to instances
@@ -34,12 +36,12 @@ public abstract class TaskAssignmentCalculator {
    * @param workflowCfg the workflow configuration
    * @param workflowCtx the workflow context
    * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
+   * @param idealStateMap the map of resource name map to ideal state
    * @return map of instances to set of partition numbers
    */
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      ClusterDataCache cache);
+      Map<String, IdealState> idealStateMap);
 }


[3/3] helix git commit: Job Config and logic refactoring

Posted by lx...@apache.org.
Job Config and logic refactoring

1. Support identical task initialization with job command and number of tasks
2. Remove unused MaxForcedReassignmentPerTask field
3. Refactor logics of failure.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9fc6c540
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9fc6c540
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9fc6c540

Branch: refs/heads/helix-0.6.x
Commit: 9fc6c540bbcb4d7c71f0b7fe89e2acbc5955e859
Parents: 7bb2a9d
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 16:01:39 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Sep 21 10:43:35 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 52 +++++++++++++++-----
 .../org/apache/helix/task/JobRebalancer.java    | 19 +------
 .../java/org/apache/helix/task/TaskConfig.java  | 50 +++++++++----------
 .../org/apache/helix/task/beans/JobBean.java    |  2 +-
 .../org/apache/helix/task/beans/TaskBean.java   |  1 +
 .../TestGenericTaskAssignmentCalculator.java    |  4 +-
 .../task/TestIndependentTaskRebalancer.java     | 45 +++--------------
 .../integration/task/TestUserContentStore.java  |  6 +--
 8 files changed, 81 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 7a4e2d3..a966f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -83,6 +83,7 @@ public class JobConfig {
      * The maximum number of times the task rebalancer may attempt to execute a task.
      */
     MaxAttemptsPerTask,
+    @Deprecated
     /**
      * The maximum number of times Helix will intentionally move a failing task
      */
@@ -134,6 +135,7 @@ public class JobConfig {
   public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
   public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
   public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false;
+  public static final int DEFAULT_NUMBER_OF_TASKS = 0;
 
   private final String _workflow;
   private final String _targetResource;
@@ -218,10 +220,6 @@ public class JobConfig {
     return _maxAttemptsPerTask;
   }
 
-  public int getMaxForcedReassignmentsPerTask() {
-    return _maxForcedReassignmentsPerTask;
-  }
-
   public int getFailureThreshold() {
     return _failureThreshold;
   }
@@ -308,6 +306,8 @@ public class JobConfig {
    * A builder for {@link JobConfig}. Validates the configurations.
    */
   public static class Builder {
+    private final String NUMBER_OF_TASKS = "NumberOfTasks";
+
     private String _workflow;
     private String _targetResource;
     private String _jobType;
@@ -325,10 +325,18 @@ public class JobConfig {
     private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
     private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
+    private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
 
     public JobConfig build() {
       validate();
 
+      if (_taskConfigMap.isEmpty()) {
+        for (int i = 0; i < _numberOfTasks; i++) {
+          TaskConfig taskConfig = new TaskConfig(null, null);
+          _taskConfigMap.put(taskConfig.getId(), taskConfig);
+        }
+      }
+
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
@@ -376,10 +384,6 @@ public class JobConfig {
         b.setMaxAttemptsPerTask(
             Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.MaxForcedReassignmentsPerTask.name())) {
-        b.setMaxForcedReassignmentsPerTask(
-            Integer.parseInt(cfg.get(JobConfigProperty.MaxForcedReassignmentsPerTask.name())));
-      }
       if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) {
         b.setFailureThreshold(
             Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
@@ -429,6 +433,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setNumberOfTasks(int v) {
+      _numberOfTasks = v;
+      return this;
+    }
+
     public Builder setJobCommandConfigMap(Map<String, String> v) {
       _commandConfig = v;
       return this;
@@ -449,6 +458,8 @@ public class JobConfig {
       return this;
     }
 
+    // This field will be ignored by Helix
+    @Deprecated
     public Builder setMaxForcedReassignmentsPerTask(int v) {
       _maxForcedReassignmentsPerTask = v;
       return this;
@@ -508,9 +519,25 @@ public class JobConfig {
         throw new IllegalArgumentException(
             String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
       }
-      if (_taskConfigMap.isEmpty() && _command == null) {
-        throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.Command));
+      if (_taskConfigMap.isEmpty()) {
+        // Check Job command is not null when none taskconfig specified
+        if (_command == null) {
+          throw new IllegalArgumentException(
+              String.format("%s cannot be null", JobConfigProperty.Command));
+        }
+        // Check number of task is set when Job command is not null and none taskconfig specified
+        if (_targetResource == null && _numberOfTasks == 0) {
+          throw new IllegalArgumentException("Either targetResource or numberOfTask should be set");
+        }
+      }
+      // Check each either Job command is not null or none of task command is not null
+      if (_command == null) {
+        for (TaskConfig taskConfig : _taskConfigMap.values()) {
+          if (taskConfig.getCommand() == null) {
+            throw new IllegalArgumentException(
+                String.format("Task % command cannot be null", taskConfig.getId()));
+          }
+        }
       }
       if (_timeoutPerTask < 0) {
         throw new IllegalArgumentException(String
@@ -547,12 +574,11 @@ public class JobConfig {
       Builder b = new Builder();
 
       b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
-          .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask)
           .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
           .setTimeoutPerTask(jobBean.timeoutPerPartition)
           .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
           .setDisableExternalView(jobBean.disableExternalView)
-          .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure);
+          .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure).setNumberOfTasks(jobBean.numberOfTasks);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index cf7f5e6..7676dab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -345,24 +345,9 @@ public class JobRebalancer extends TaskRebalancer {
           // maximum number of attempts or task is in ABORTED state.
           if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() ||
               currState.equals(TaskPartitionState.TASK_ABORTED)) {
-            // If the user does not require this task to succeed in order for the job to succeed,
-            // then we don't have to fail the job right now
-            boolean successOptional = false;
-            String taskId = jobCtx.getTaskIdForPartition(pId);
-            if (taskId != null) {
-              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-              if (taskConfig != null) {
-                successOptional = taskConfig.isSuccessOptional();
-              }
-            }
-
-            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+            // If we have some leeway for how many tasks we can fail, then we don't have
             // to fail the job immediately
-            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
-              successOptional = true;
-            }
-
-            if (!successOptional) {
+            if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
               markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
               markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index b990f99..621d371 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -36,6 +36,7 @@ public class TaskConfig {
   private enum TaskConfigProperty {
     TASK_ID,
     TASK_COMMAND,
+    @Deprecated
     TASK_SUCCESS_OPTIONAL,
     TASK_TARGET_PARTITION
   }
@@ -44,18 +45,26 @@ public class TaskConfig {
 
   private final Map<String, String> _configMap;
 
+  @Deprecated
+  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
+      String id, String target) {
+    this(command, configMap, id, target);
+  }
+
+  @Deprecated
+  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
+    this(command, configMap, null, null);
+  }
+
   /**
    * Instantiate the task config
    *
    * @param command         the command to invoke for the task
    * @param configMap       configuration to be passed as part of the invocation
-   * @param successOptional true if this task need not pass for the job to succeed, false
-   *                        otherwise
    * @param id              existing task ID
    * @param target          target partition for a task
    */
-  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
-      String id, String target) {
+  public TaskConfig(String command, Map<String, String> configMap, String id, String target) {
     if (configMap == null) {
       configMap = Maps.newHashMap();
     }
@@ -65,8 +74,6 @@ public class TaskConfig {
     if (command != null) {
       configMap.put(TaskConfigProperty.TASK_COMMAND.name(), command);
     }
-    configMap
-        .put(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name(), Boolean.toString(successOptional));
     configMap.put(TaskConfigProperty.TASK_ID.name(), id);
     if (target != null) {
       configMap.put(TaskConfigProperty.TASK_TARGET_PARTITION.name(), target);
@@ -79,11 +86,9 @@ public class TaskConfig {
    *
    * @param command         the command to invoke for the task
    * @param configMap       configuration to be passed as part of the invocation
-   * @param successOptional true if this task need not pass for the job to succeed, false
-   *                        otherwise
    */
-  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
-    this(command, configMap, successOptional, null, null);
+  public TaskConfig(String command, Map<String, String> configMap) {
+    this(command, configMap, null, null);
   }
 
   /**
@@ -115,16 +120,13 @@ public class TaskConfig {
 
   /**
    * Check if this task must succeed for a job to succeed
-   *
+   * This field has been ignored by Helix
    * @return true if success is optional, false otherwise
    */
+  @Deprecated
   public boolean isSuccessOptional() {
-    String successOptionalStr = _configMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
-    if (successOptionalStr == null) {
-      return false;
-    } else {
-      return Boolean.parseBoolean(successOptionalStr);
-    }
+    // This option will not be used in rebalancer anymore, deprecate it.
+    return true;
   }
 
   /**
@@ -154,7 +156,7 @@ public class TaskConfig {
     private Map<String, String> _configMap;
 
     public TaskConfig build() {
-      return new TaskConfig(_command, _configMap, _successOptional, _taskId, _targetPartition);
+      return new TaskConfig(_command, _configMap, _taskId, _targetPartition);
     }
 
     public String getTaskId() {
@@ -184,10 +186,12 @@ public class TaskConfig {
       return this;
     }
 
+    @Deprecated
     public boolean isSuccessOptional() {
       return _successOptional;
     }
 
+    @Deprecated
     public Builder setSuccessOptional(boolean successOptional) {
       _successOptional = successOptional;
       return this;
@@ -208,7 +212,7 @@ public class TaskConfig {
      * @return instantiated TaskConfig
      */
     public static TaskConfig from(String target) {
-      return new TaskConfig(null, null, false, null, target);
+      return new TaskConfig(null, null, null, target);
     }
 
     /**
@@ -218,7 +222,7 @@ public class TaskConfig {
      * @return instantiated TaskConfig
      */
     public static TaskConfig from(TaskBean bean) {
-      return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
+      return new TaskConfig(bean.command, bean.taskConfigMap);
     }
 
     /**
@@ -232,11 +236,7 @@ public class TaskConfig {
       String taskId = rawConfigMap.get(TaskConfigProperty.TASK_ID.name());
       String command = rawConfigMap.get(TaskConfigProperty.TASK_COMMAND.name());
       String targetPartition = rawConfigMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
-      String successOptionalStr =
-          rawConfigMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
-      boolean successOptional =
-          (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : false;
-      return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+      return new TaskConfig(command, rawConfigMap, taskId, targetPartition);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index dd7ebab..9a376f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -41,9 +41,9 @@ public class JobBean {
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
-  public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
   public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
   public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
   public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
   public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
+  public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index 97ecfc0..a61556b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -29,5 +29,6 @@ import java.util.Map;
 public class TaskBean {
   public String command;
   public Map<String, String> taskConfigMap;
+  @Deprecated
   public boolean successOptional = false;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index 0410db2..5645009 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -104,7 +104,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
     _driver = new TaskDriver(_manager);
 
     Map<String, String> taskConfigMap = Maps.newHashMap();
-    _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false);
+    _taskConfig = new TaskConfig("TaskOne", taskConfigMap);
     _jobCommandMap = Maps.newHashMap();
   }
 
@@ -136,7 +136,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(20);
     for (int i = 0; i < 50; i++) {
       Map<String, String> taskConfigMap = Maps.newHashMap();
-      taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false));
+      taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap));
     }
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap)

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index c4d588c..64b9073 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -136,8 +136,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null, true);
-    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
     Map<String, String> jobCommandMap = Maps.newHashMap();
@@ -164,8 +164,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
     Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
-    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
     Map<String, String> jobConfigMap = Maps.newHashMap();
@@ -185,35 +185,6 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test public void testOptionalTaskFailure() throws Exception {
-    // Create a job with two different tasks
-    String jobName = TestHelper.getTestMethodName();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
-    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, true);
-    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
-    taskConfigs.add(taskConfig1);
-    taskConfigs.add(taskConfig2);
-    Map<String, String> jobCommandMap = Maps.newHashMap();
-    jobCommandMap.put("Timeout", "1000");
-
-    JobConfig.Builder jobBuilder =
-        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
-            .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJob(jobName, jobBuilder);
-
-    _driver.start(workflowBuilder.build());
-
-    // Ensure the job completes
-    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
-    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
-
-    // Ensure that each class was invoked
-    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
-  }
-
   @Test public void testReassignment() throws Exception {
     final int NUM_INSTANCES = 5;
     String jobName = TestHelper.getTestMethodName();
@@ -221,13 +192,13 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
     Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap
         .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
 
     JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
-        .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+        .addTaskConfigs(taskConfigs)
         .setJobCommandConfigMap(jobCommandMap);
     workflowBuilder.addJob(jobName, jobBuilder);
 
@@ -254,7 +225,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
     Map<String, String> taskConfigMap = Maps.newHashMap();
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
@@ -289,7 +260,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
     Map<String, String> taskConfigMap = Maps.newHashMap();
-    TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
+    TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
index b2b27ef..13cd531 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
@@ -123,7 +123,7 @@ public class TestUserContentStore extends TaskTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
     Map<String, String> taskConfigMap = Maps.newHashMap();
-    TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap, false);
+    TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
@@ -148,8 +148,8 @@ public class TestUserContentStore extends TaskTestBase {
     List<TaskConfig> taskConfigs2 = Lists.newArrayListWithCapacity(1);
     Map<String, String> taskConfigMap1 = Maps.newHashMap();
     Map<String, String> taskConfigMap2 = Maps.newHashMap();
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1, false);
-    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2, false);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2);
 
     taskConfigs1.add(taskConfig1);
     taskConfigs2.add(taskConfig2);