You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/22 23:11:06 UTC
[1/3] helix git commit: [HELIX-635] GenericTaskAssignmentCalculator
rebalance with consistent hashing
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 7147ec874 -> 9fc6c540b
[HELIX-635] GenericTaskAssignmentCalculator rebalance with consistent hashing
1. Implement consistent hashing mapping calculation
2. Remove reassign logics and applied in consistent hashing
3. Add tests for GenericTaskAssignmentCalculator
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0a18726f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0a18726f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0a18726f
Branch: refs/heads/helix-0.6.x
Commit: 0a18726fcad7b8a0fe5e77d7a2c9848b86461ccc
Parents: 7147ec8
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 15:28:57 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Sep 13 15:59:24 2016 -0700
----------------------------------------------------------------------
.../strategy/CrushRebalanceStrategy.java | 2 +-
.../crushMapping/CRUSHPlacementAlgorithm.java | 1 +
.../strategy/crushMapping/JenkinsHash.java | 140 -----------
.../task/GenericTaskAssignmentCalculator.java | 238 +++++++------------
.../java/org/apache/helix/util/JenkinsHash.java | 140 +++++++++++
.../TestGenericTaskAssignmentCalculator.java | 171 +++++++++++++
.../task/TestIndependentTaskRebalancer.java | 8 +-
7 files changed, 405 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index a8fe107..b91d26c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -24,7 +24,7 @@ import com.google.common.base.Predicates;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
-import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.util.JenkinsHash;
import org.apache.helix.controller.rebalancer.topology.Node;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.controller.stages.ClusterDataCache;
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
index 870656c..b7c1c68 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.util.JenkinsHash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
deleted file mode 100644
index 66566f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2013 Twitter, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.controller.rebalancer.strategy.crushMapping;
-
-public class JenkinsHash {
- // max value to limit it to 4 bytes
- private static final long MAX_VALUE = 0xFFFFFFFFL;
- private static final long CRUSH_HASH_SEED = 1315423911L;
-
- /**
- * Convert a byte into a long value without making it negative.
- */
- private static long byteToLong(byte b) {
- long val = b & 0x7F;
- if ((b & 0x80) != 0) {
- val += 128;
- }
- return val;
- }
-
- /**
- * Do addition and turn into 4 bytes.
- */
- private static long add(long val, long add) {
- return (val + add) & MAX_VALUE;
- }
-
- /**
- * Do subtraction and turn into 4 bytes.
- */
- private static long subtract(long val, long subtract) {
- return (val - subtract) & MAX_VALUE;
- }
-
- /**
- * Left shift val by shift bits and turn in 4 bytes.
- */
- private static long xor(long val, long xor) {
- return (val ^ xor) & MAX_VALUE;
- }
-
- /**
- * Left shift val by shift bits. Cut down to 4 bytes.
- */
- private static long leftShift(long val, int shift) {
- return (val << shift) & MAX_VALUE;
- }
-
- /**
- * Convert 4 bytes from the buffer at offset into a long value.
- */
- private static long fourByteToLong(byte[] bytes, int offset) {
- return (byteToLong(bytes[offset + 0])
- + (byteToLong(bytes[offset + 1]) << 8)
- + (byteToLong(bytes[offset + 2]) << 16)
- + (byteToLong(bytes[offset + 3]) << 24));
- }
-
- /**
- * Mix up the values in the hash function.
- */
- private static Triple hashMix(Triple t) {
- long a = t.a; long b = t.b; long c = t.c;
- a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
- b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
- c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
- a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
- b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
- c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
- a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
- b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
- c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
- return new Triple(a, b, c);
- }
-
- private static class Triple {
- long a;
- long b;
- long c;
-
- public Triple(long a, long b, long c) {
- this.a = a; this.b = b; this.c = c;
- }
- }
-
- public long hash(long a) {
- long hash = xor(CRUSH_HASH_SEED, a);
- long b = a;
- long x = 231232L;
- long y = 1232L;
- Triple val = hashMix(new Triple(b, x, hash));
- b = val.a; x = val.b; hash = val.c;
- val = hashMix(new Triple(y, a, hash));
- hash = val.c;
- return hash;
- }
-
- public long hash(long a, long b) {
- long hash = xor(xor(CRUSH_HASH_SEED, a), b);
- long x = 231232L;
- long y = 1232L;
- Triple val = hashMix(new Triple(a, b, hash));
- a = val.a; b = val.b; hash = val.c;
- val = hashMix(new Triple(x, a, hash));
- x = val.a; a = val.b; hash = val.c;
- val = hashMix(new Triple(b, y, hash));
- hash = val.c;
- return hash;
- }
-
- public long hash(long a, long b, long c) {
- long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
- long x = 231232L;
- long y = 1232L;
- Triple val = hashMix(new Triple(a, b, hash));
- a = val.a; b = val.b; hash = val.c;
- val = hashMix(new Triple(c, x, hash));
- c = val.a; x = val.b; hash = val.c;
- val = hashMix(new Triple(y, a, hash));
- y = val.a; a = val.b; hash = val.c;
- val = hashMix(new Triple(b, x, hash));
- b = val.a; x = val.b; hash = val.c;
- val = hashMix(new Triple(y, c, hash));
- hash = val.c;
- return hash;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index ac96768..fbc7af3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -19,30 +19,26 @@ package org.apache.helix.task;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixException;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.JenkinsHash;
import org.apache.log4j.Logger;
-import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -54,9 +50,6 @@ import com.google.common.collect.Sets;
public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
private static final Logger LOG = Logger.getLogger(GenericTaskAssignmentCalculator.class);
- /** Reassignment policy for this algorithm */
- private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
-
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
@@ -96,14 +89,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
// Transform from partition id to fully qualified partition name
List<Integer> partitionNums = Lists.newArrayList(partitionSet);
Collections.sort(partitionNums);
- final String resourceId = prevAssignment.getResourceName();
- List<String> partitions =
- new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
- @Override
- public String apply(Integer partitionNum) {
- return resourceId + "_" + partitionNum;
- }
- }));
+ String resourceId = prevAssignment.getResourceName();
// Compute the current assignment
Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
@@ -122,156 +108,108 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
}
// Get the assignment keyed on partition
- RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
- List<String> allNodes =
- Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
- Collections.sort(allNodes);
- ZNRecord record =
- strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
- Map<String, List<String>> preferenceLists = record.getListFields();
-
- // Convert to an assignment keyed on participant
- Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
- for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
- String partitionName = e.getKey();
- partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName));
- List<String> preferenceList = e.getValue();
- for (String participantName : preferenceList) {
- if (!taskAssignment.containsKey(participantName)) {
- taskAssignment.put(participantName, new TreeSet<Integer>());
- }
- taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
- }
+ if (jobCfg.getTargetResource() != null) {
+ LOG.error(
+ "Target resource is not null, should call FixedTaskAssignmentCalculator, target resource : "
+ + jobCfg.getTargetResource());
+ return new HashMap<String, SortedSet<Integer>>();
}
- // Finally, adjust the assignment if tasks have been failing
- taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
+ List<String> allNodes = Lists.newArrayList(instances);
+ ConsistentHashingPlacement placement = new ConsistentHashingPlacement(allNodes);
+ Map<String, SortedSet<Integer>> taskAssignment =
+ placement.computeMapping(jobCfg, jobContext, partitionNums, resourceId);
+
return taskAssignment;
}
- /**
- * Filter a list of instances based on targeted resource policies
- * @param jobCfg the job configuration
- * @param currStateOutput the current state of all instances in the cluster
- * @param instances valid instances
- * @param cache current snapshot of the cluster
- * @return a set of instances that can be assigned to
- */
- private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
- Iterable<String> instances, ClusterDataCache cache) {
- // No target resource means any instance is available
- Set<String> allInstances = Sets.newHashSet(instances);
- String targetResource = jobCfg.getTargetResource();
- if (targetResource == null) {
- return allInstances;
- }
+ private class ConsistentHashingPlacement {
+ private JenkinsHash _hashFunction;
+ private ConsistentHashSelector _selector;
+ private int _numInstances;
- // Bad ideal state means don't assign
- IdealState idealState = cache.getIdealState(targetResource);
- if (idealState == null) {
- return Collections.emptySet();
+ public ConsistentHashingPlacement(List<String> potentialInstances) {
+ _hashFunction = new JenkinsHash();
+ _selector = new ConsistentHashSelector(potentialInstances);
+ _numInstances = potentialInstances.size();
}
- // Get the partitions on the target resource to use
- Set<String> partitions = idealState.getPartitionSet();
- List<String> targetPartitions = jobCfg.getTargetPartitions();
- if (targetPartitions != null && !targetPartitions.isEmpty()) {
- partitions.retainAll(targetPartitions);
- }
+ public Map<String, SortedSet<Integer>> computeMapping(JobConfig jobConfig,
+ JobContext jobContext, List<Integer> partitions, String resourceId) {
+ if (_numInstances == 0) {
+ return new HashMap<String, SortedSet<Integer>>();
+ }
+
+ Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+
+ for (int partition : partitions) {
+ long hashedValue = new String(resourceId + "_" + partition).hashCode();
+ int shiftTimes;
+ int numAttempts = jobContext.getPartitionNumAttempts(partition);
+ int maxAttempts = jobConfig.getMaxAttemptsPerTask();
- // Based on state matches, add eligible instances
- Set<String> eligibleInstances = Sets.newHashSet();
- Set<String> targetStates = jobCfg.getTargetPartitionStates();
- for (String partition : partitions) {
- Map<String, String> stateMap =
- currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
- Map<String, String> pendingStateMap =
- currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
- for (Map.Entry<String, String> e : stateMap.entrySet()) {
- String instanceName = e.getKey();
- String state = e.getValue();
- String pending = pendingStateMap.get(instanceName);
- if (pending != null) {
- continue;
+ if (jobConfig.getMaxAttemptsPerTask() < _numInstances) {
+ shiftTimes = numAttempts == -1 ? 0 : numAttempts;
+ } else {
+ shiftTimes = (maxAttempts == 0)
+ ? 0
+ : jobContext.getPartitionNumAttempts(partition) / (maxAttempts / _numInstances);
+ }
+ // Hash the value based on the shifting time. The default shift time will be 0.
+ for (int i = 0; i <= shiftTimes; i++) {
+ hashedValue = _hashFunction.hash(hashedValue);
}
- if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
- eligibleInstances.add(instanceName);
+ String selectedInstance = select(hashedValue);
+ if (selectedInstance != null) {
+ if (!taskAssignment.containsKey(selectedInstance)) {
+ taskAssignment.put(selectedInstance, new TreeSet<Integer>());
+ }
+ taskAssignment.get(selectedInstance).add(partition);
}
}
+ return taskAssignment;
}
- allInstances.retainAll(eligibleInstances);
- return allInstances;
- }
- public interface RetryPolicy {
- /**
- * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
- * assigned
- * @param jobCfg the job configuration
- * @param jobCtx the job context
- * @param instances instances that can serve tasks
- * @param origAssignment the unmodified assignment
- * @return the adjusted assignment
- */
- Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
- }
+ private String select(long data) throws HelixException {
+ return _selector.get(data);
+ }
+
+ private class ConsistentHashSelector {
+ private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000;
+ private final SortedMap<Long, String> circle = new TreeMap<Long, String>();
+ protected int instanceSize = 0;
- private static class DefaultRetryReassigner implements RetryPolicy {
- @Override
- public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
- // Compute an increasing integer ID for each instance
- BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
- int instanceIndex = 0;
- for (String instance : instances) {
- instanceMap.put(instance, instanceIndex++);
+ public ConsistentHashSelector(List<String> instances) {
+ for (String instance : instances) {
+ long tokenCount = DEFAULT_TOKENS_PER_INSTANCE;
+ add(instance, tokenCount);
+ instanceSize++;
+ }
}
- // Move partitions
- Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
- for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
- String instance = e.getKey();
- SortedSet<Integer> partitions = e.getValue();
- Integer instanceId = instanceMap.get(instance);
- if (instanceId != null) {
- for (int p : partitions) {
- // Determine for each partition if there have been failures with the current assignment
- // strategy, and if so, force a shift in assignment for that partition only
- int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
- int newInstanceId = (instanceId + shiftValue) % instances.size();
- String newInstance = instanceMap.inverse().get(newInstanceId);
- if (newInstance == null) {
- newInstance = instance;
- }
- if (!newAssignment.containsKey(newInstance)) {
- newAssignment.put(newInstance, new TreeSet<Integer>());
- }
- newAssignment.get(newInstance).add(p);
- }
- } else {
- // In case something goes wrong, just keep the previous assignment
- newAssignment.put(instance, partitions);
+ public void add(String instance, long numberOfReplicas) {
+ for (int i = 0; i < numberOfReplicas; i++) {
+ circle.put(_hashFunction.hash(instance.hashCode(), i), instance);
+ }
+ }
+
+ public void remove(String instance, long numberOfReplicas) {
+ for (int i = 0; i < numberOfReplicas; i++) {
+ circle.remove(_hashFunction.hash(instance.hashCode(), i));
}
}
- return newAssignment;
- }
- /**
- * In case tasks fail, we may not want to schedule them in the same place. This method allows us
- * to compute a shifting value so that we can systematically choose other instances to try
- * @param jobCfg the job configuration
- * @param jobCtx the job context
- * @param instances instances that can be chosen
- * @param p the partition to look up
- * @return the shifting value
- */
- private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, int p) {
- int numAttempts = jobCtx.getPartitionNumAttempts(p);
- int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
- int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
- return numAttempts / (maxNumAttempts / numInstances);
+ public String get(long data) {
+ if (circle.isEmpty()) {
+ return null;
+ }
+ long hash = _hashFunction.hash(data);
+ if (!circle.containsKey(hash)) {
+ SortedMap<Long, String> tailMap = circle.tailMap(hash);
+ hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+ }
+ return circle.get(hash);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
new file mode 100644
index 0000000..3ccd1f4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.util;
+
+public class JenkinsHash {
+ // max value to limit it to 4 bytes
+ private static final long MAX_VALUE = 0xFFFFFFFFL;
+ private static final long CRUSH_HASH_SEED = 1315423911L;
+
+ /**
+ * Convert a byte into a long value without making it negative.
+ */
+ private static long byteToLong(byte b) {
+ long val = b & 0x7F;
+ if ((b & 0x80) != 0) {
+ val += 128;
+ }
+ return val;
+ }
+
+ /**
+ * Do addition and turn into 4 bytes.
+ */
+ private static long add(long val, long add) {
+ return (val + add) & MAX_VALUE;
+ }
+
+ /**
+ * Do subtraction and turn into 4 bytes.
+ */
+ private static long subtract(long val, long subtract) {
+ return (val - subtract) & MAX_VALUE;
+ }
+
+ /**
+ * Left shift val by shift bits and turn in 4 bytes.
+ */
+ private static long xor(long val, long xor) {
+ return (val ^ xor) & MAX_VALUE;
+ }
+
+ /**
+ * Left shift val by shift bits. Cut down to 4 bytes.
+ */
+ private static long leftShift(long val, int shift) {
+ return (val << shift) & MAX_VALUE;
+ }
+
+ /**
+ * Convert 4 bytes from the buffer at offset into a long value.
+ */
+ private static long fourByteToLong(byte[] bytes, int offset) {
+ return (byteToLong(bytes[offset + 0])
+ + (byteToLong(bytes[offset + 1]) << 8)
+ + (byteToLong(bytes[offset + 2]) << 16)
+ + (byteToLong(bytes[offset + 3]) << 24));
+ }
+
+ /**
+ * Mix up the values in the hash function.
+ */
+ private static Triple hashMix(Triple t) {
+ long a = t.a; long b = t.b; long c = t.c;
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+ return new Triple(a, b, c);
+ }
+
+ private static class Triple {
+ long a;
+ long b;
+ long c;
+
+ public Triple(long a, long b, long c) {
+ this.a = a; this.b = b; this.c = c;
+ }
+ }
+
+ public long hash(long a) {
+ long hash = xor(CRUSH_HASH_SEED, a);
+ long b = a;
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(b, x, hash));
+ b = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, a, hash));
+ hash = val.c;
+ return hash;
+ }
+
+ public long hash(long a, long b) {
+ long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(a, b, hash));
+ a = val.a; b = val.b; hash = val.c;
+ val = hashMix(new Triple(x, a, hash));
+ x = val.a; a = val.b; hash = val.c;
+ val = hashMix(new Triple(b, y, hash));
+ hash = val.c;
+ return hash;
+ }
+
+ public long hash(long a, long b, long c) {
+ long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(a, b, hash));
+ a = val.a; b = val.b; hash = val.c;
+ val = hashMix(new Triple(c, x, hash));
+ c = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, a, hash));
+ y = val.a; a = val.b; hash = val.c;
+ val = hashMix(new Triple(b, x, hash));
+ b = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, c, hash));
+ hash = val.c;
+ return hash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
new file mode 100644
index 0000000..0410db2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -0,0 +1,171 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
+ private Set<String> _invokedClasses = Sets.newHashSet();
+ private Map<String, Integer> _runCounts = Maps.newHashMap();
+ private TaskConfig _taskConfig;
+ private Map<String, String> _jobCommandMap;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ // Setup cluster and instances
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < _numNodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+ setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // start dummy participants
+ for (int i = 0; i < _numNodes; i++) {
+ final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+ // Set task callbacks
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+
+ taskFactoryReg.put("TaskOne", new TaskFactory() {
+ @Override public Task createNewTask(TaskCallbackContext context) {
+ return new TaskOne(context, instanceName);
+ }
+ });
+
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+ _participants[i].syncStart();
+ }
+
+ // Start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Start an admin connection
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ _driver = new TaskDriver(_manager);
+
+ Map<String, String> taskConfigMap = Maps.newHashMap();
+ _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false);
+ _jobCommandMap = Maps.newHashMap();
+ }
+
+ @Test
+ public void testMultipleJobAssignment() throws InterruptedException {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+ taskConfigs.add(_taskConfig);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(_jobCommandMap);
+
+ for (int i = 0; i < 25; i++) {
+ workflowBuilder.addJob("JOB" + i, jobBuilder);
+ }
+
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ Assert.assertEquals(_runCounts.size(), 5);
+ }
+
+ @Test
+ public void testMultipleTaskAssignment() throws InterruptedException {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(20);
+ for (int i = 0; i < 50; i++) {
+ Map<String, String> taskConfigMap = Maps.newHashMap();
+ taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false));
+ }
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap)
+ .addTaskConfigs(taskConfigs);
+ workflowBuilder.addJob("JOB", jobBuilder);
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ Assert.assertEquals(_runCounts.size(), 5);
+ }
+
+ private class TaskOne extends MockTask {
+ private final String _instanceName;
+
+ public TaskOne(TaskCallbackContext context, String instanceName) {
+ super(context);
+
+ // Initialize the count for this instance if not already done
+ if (!_runCounts.containsKey(instanceName)) {
+ _runCounts.put(instanceName, 0);
+ }
+ _instanceName = instanceName;
+ }
+
+ @Override
+ public TaskResult run() {
+ _invokedClasses.add(getClass().getName());
+ _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+ return new TaskResult(TaskResult.Status.COMPLETED, "");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 49b4bf4..c4d588c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -215,12 +215,12 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
}
@Test public void testReassignment() throws Exception {
- final int NUM_INSTANCES = 2;
+ final int NUM_INSTANCES = 5;
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- Map<String, String> taskConfigMap = Maps.newHashMap(
- ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + _startPort));
+ Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap
+ .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
@@ -242,7 +242,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
// Ensure that this was tried on two different instances, the first of which exhausted the
// attempts number, and the other passes on the first try
- Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
+ Assert.assertEquals(_runCounts.size(), 2);
Assert.assertTrue(
_runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
Assert.assertTrue(_runCounts.values().contains(1));
[2/3] helix git commit: Refactor TaskAssignmentCalculator API
Posted by lx...@apache.org.
Refactor TaskAssignmentCalculator API
Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large and not all the contents inside are used.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb2a9db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb2a9db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb2a9db
Branch: refs/heads/helix-0.6.x
Commit: 7bb2a9db2396a00bb9a721634a2432240679c657
Parents: 0a18726
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 16:00:08 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Sep 13 16:00:08 2016 -0700
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 36 ++++++++++++++------
.../FixedTargetTaskAssignmentCalculator.java | 35 +++++++------------
.../helix/task/FixedTargetTaskRebalancer.java | 4 +--
.../task/GenericTaskAssignmentCalculator.java | 6 ++--
.../helix/task/GenericTaskRebalancer.java | 4 +--
.../org/apache/helix/task/JobRebalancer.java | 11 +++---
.../helix/task/TaskAssignmentCalculator.java | 10 +++---
7 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dacf98d..c8ca941 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -453,21 +453,35 @@ public class ClusterDataCache {
}
/**
- * Return all the nodes that are enabled and tagged same as the job.
- * @param allInstances List of instances to filter with instance tag
- * @param instanceTag The instance group tag
- * @return A new set contains instance name and that are marked enabled and have same
- * tag with job. The original set will not be changed during the filtering
+ * Return all the live nodes that are enabled
+ * @return A new set contains live instance name and that are marked enabled
*/
- public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances,
- String instanceTag) {
+ public Set<String> getAllEnabledLiveInstances() {
+ return getAllEnabledInstances(null);
+ }
+
+ /**
+ * Return all the live nodes that are enabled and tagged same as the job.
+ * @param instanceTag The instance group tag, could be null, when no instance group specified
+ * @return A new set contains live instance name and that are marked enabled and have same
+ * tag with job, only if instance tag input is not null.
+ */
+ public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
+ return getAllEnabledInstances(instanceTag);
+ }
+
+ private Set<String> getAllEnabledInstances(String instanceTag) {
Set<String> enabledTagInstances = new HashSet<String>();
- for (String instance : allInstances) {
+ for (String instance : _liveInstanceMap.keySet()) {
InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
- if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig
- .containsTag(instanceTag)) {
- enabledTagInstances.add(instance);
+ // Check instance is enabled
+ if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+ // Check whether it has instance group or not
+ // If it has instance group, check whether it belongs to that group or not
+ if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+ enabledTagInstances.add(instance);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 09db616..0768b51 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -48,36 +48,37 @@ import org.apache.log4j.Logger;
public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
- @Override
- public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
- return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+ @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap) {
+ return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), jobCfg, jobCtx);
}
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
- IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+ Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+ IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap);
if (tgtIs == null) {
LOG.warn("Missing target resource for the scheduled job!");
return Collections.emptyMap();
}
Set<String> tgtStates = jobCfg.getTargetPartitionStates();
return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
- jobContext, cache);
+ jobContext);
}
/**
* Gets the ideal state of the target resource of this job
* @param jobCfg job config containing target resource id
- * @param cache snapshot of the cluster containing the task and target resource
+ * @param idealStateMap the map of resource name map to ideal state
* @return target resource ideal state, or null
*/
- private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+ private static IdealState getTgtIdealState(JobConfig jobCfg,
+ Map<String, IdealState> idealStateMap) {
String tgtResourceId = jobCfg.getTargetResource();
- return cache.getIdealState(tgtResourceId);
+ return idealStateMap.get(tgtResourceId);
}
/**
@@ -131,7 +132,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
*/
private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
- Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) {
+ Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
for (String instance : instances) {
result.put(instance, new TreeSet<Integer>());
@@ -153,18 +154,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
continue;
}
- InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
-
- if (instanceConfig == null) {
- LOG.error("Instance config not found for instance : " + instance);
- continue;
- }
-
- if (!instanceConfig.getInstanceEnabled()) {
- LOG.debug("Instance has been disabled, ignore instance : " + instance);
- continue;
- }
-
String s =
currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
instance);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 569fe03..1589c1a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -43,7 +43,7 @@ import org.apache.helix.model.ResourceAssignment;
@Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
return taskAssignmentCalculator
- .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
}
@Override public Map<String, SortedSet<Integer>> getTaskAssignment(
@@ -53,6 +53,6 @@ import org.apache.helix.model.ResourceAssignment;
ClusterDataCache cache) {
return taskAssignmentCalculator
.getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
- workflowCfg, workflowCtx, partitionSet, cache);
+ workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index fbc7af3..58ba670 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
import org.apache.helix.HelixException;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.util.JenkinsHash;
@@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap) {
Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
for (TaskConfig taskCfg : taskMap.values()) {
@@ -69,7 +71,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
+ Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
// Gather input to the full auto rebalancing algorithm
LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
states.put("ONLINE", 1);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 6a005b9..1720fbb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
return taskAssignmentCalculator
- .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
}
@Override
@@ -52,6 +52,6 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
Set<Integer> partitionSet, ClusterDataCache cache) {
return taskAssignmentCalculator
.getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
- workflowCfg, workflowCtx, partitionSet, cache);
+ workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 378ad95..cf7f5e6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -131,9 +131,8 @@ public class JobRebalancer extends TaskRebalancer {
// Fetch the previous resource assignment from the property store. This is required because of
// HELIX-230.
Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
- ? clusterData.getLiveInstances().keySet()
- : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
- jobCfg.getInstanceGroupTag());
+ ? clusterData.getAllEnabledLiveInstances()
+ : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job!");
@@ -222,8 +221,8 @@ public class JobRebalancer extends TaskRebalancer {
// Process all the current assignments of tasks.
TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
- Set<Integer> allPartitions =
- taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+ Set<Integer> allPartitions = taskAssignmentCal
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates());
if (allPartitions == null || allPartitions.isEmpty()) {
// Empty target partitions, mark the job as FAILED.
@@ -424,7 +423,7 @@ public class JobRebalancer extends TaskRebalancer {
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
.getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
- workflowConfig, workflowCtx, allPartitions, cache);
+ workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
String instance = entry.getKey();
if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index a3ed5ab..a6a9ed3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -2,6 +2,7 @@ package org.apache.helix.task;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import java.util.Collection;
@@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator {
* @param jobCtx the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
- * @param cache cluster snapshot
+ * @param idealStateMap the map of resource name map to ideal state
* @return set of partition numbers
*/
public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap);
/**
* Compute an assignment of tasks to instances
@@ -34,12 +36,12 @@ public abstract class TaskAssignmentCalculator {
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param partitionSet the partitions to assign
- * @param cache cluster snapshot
+ * @param idealStateMap the map of resource name map to ideal state
* @return map of instances to set of partition numbers
*/
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
- ClusterDataCache cache);
+ Map<String, IdealState> idealStateMap);
}
[3/3] helix git commit: Job Config and logic refactoring
Posted by lx...@apache.org.
Job Config and logic refactoring
1. Support identical task initialization with job command and number of tasks
2. Remove unused MaxForcedReassignmentPerTask field
3. Refactor logics of failure.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9fc6c540
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9fc6c540
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9fc6c540
Branch: refs/heads/helix-0.6.x
Commit: 9fc6c540bbcb4d7c71f0b7fe89e2acbc5955e859
Parents: 7bb2a9d
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 16:01:39 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Sep 21 10:43:35 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 52 +++++++++++++++-----
.../org/apache/helix/task/JobRebalancer.java | 19 +------
.../java/org/apache/helix/task/TaskConfig.java | 50 +++++++++----------
.../org/apache/helix/task/beans/JobBean.java | 2 +-
.../org/apache/helix/task/beans/TaskBean.java | 1 +
.../TestGenericTaskAssignmentCalculator.java | 4 +-
.../task/TestIndependentTaskRebalancer.java | 45 +++--------------
.../integration/task/TestUserContentStore.java | 6 +--
8 files changed, 81 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 7a4e2d3..a966f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -83,6 +83,7 @@ public class JobConfig {
* The maximum number of times the task rebalancer may attempt to execute a task.
*/
MaxAttemptsPerTask,
+ @Deprecated
/**
* The maximum number of times Helix will intentionally move a failing task
*/
@@ -134,6 +135,7 @@ public class JobConfig {
public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false;
+ public static final int DEFAULT_NUMBER_OF_TASKS = 0;
private final String _workflow;
private final String _targetResource;
@@ -218,10 +220,6 @@ public class JobConfig {
return _maxAttemptsPerTask;
}
- public int getMaxForcedReassignmentsPerTask() {
- return _maxForcedReassignmentsPerTask;
- }
-
public int getFailureThreshold() {
return _failureThreshold;
}
@@ -308,6 +306,8 @@ public class JobConfig {
* A builder for {@link JobConfig}. Validates the configurations.
*/
public static class Builder {
+ private final String NUMBER_OF_TASKS = "NumberOfTasks";
+
private String _workflow;
private String _targetResource;
private String _jobType;
@@ -325,10 +325,18 @@ public class JobConfig {
private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
+ private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
public JobConfig build() {
validate();
+ if (_taskConfigMap.isEmpty()) {
+ for (int i = 0; i < _numberOfTasks; i++) {
+ TaskConfig taskConfig = new TaskConfig(null, null);
+ _taskConfigMap.put(taskConfig.getId(), taskConfig);
+ }
+ }
+
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
_maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
@@ -376,10 +384,6 @@ public class JobConfig {
b.setMaxAttemptsPerTask(
Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name())));
}
- if (cfg.containsKey(JobConfigProperty.MaxForcedReassignmentsPerTask.name())) {
- b.setMaxForcedReassignmentsPerTask(
- Integer.parseInt(cfg.get(JobConfigProperty.MaxForcedReassignmentsPerTask.name())));
- }
if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) {
b.setFailureThreshold(
Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
@@ -429,6 +433,11 @@ public class JobConfig {
return this;
}
+ public Builder setNumberOfTasks(int v) {
+ _numberOfTasks = v;
+ return this;
+ }
+
public Builder setJobCommandConfigMap(Map<String, String> v) {
_commandConfig = v;
return this;
@@ -449,6 +458,8 @@ public class JobConfig {
return this;
}
+ // This field will be ignored by Helix
+ @Deprecated
public Builder setMaxForcedReassignmentsPerTask(int v) {
_maxForcedReassignmentsPerTask = v;
return this;
@@ -508,9 +519,25 @@ public class JobConfig {
throw new IllegalArgumentException(
String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
}
- if (_taskConfigMap.isEmpty() && _command == null) {
- throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.Command));
+ if (_taskConfigMap.isEmpty()) {
+ // Check Job command is not null when none taskconfig specified
+ if (_command == null) {
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.Command));
+ }
+ // Check number of task is set when Job command is not null and none taskconfig specified
+ if (_targetResource == null && _numberOfTasks == 0) {
+ throw new IllegalArgumentException("Either targetResource or numberOfTask should be set");
+ }
+ }
+ // Check each either Job command is not null or none of task command is not null
+ if (_command == null) {
+ for (TaskConfig taskConfig : _taskConfigMap.values()) {
+ if (taskConfig.getCommand() == null) {
+ throw new IllegalArgumentException(
+ String.format("Task % command cannot be null", taskConfig.getId()));
+ }
+ }
}
if (_timeoutPerTask < 0) {
throw new IllegalArgumentException(String
@@ -547,12 +574,11 @@ public class JobConfig {
Builder b = new Builder();
b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
- .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask)
.setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
.setTimeoutPerTask(jobBean.timeoutPerPartition)
.setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
.setDisableExternalView(jobBean.disableExternalView)
- .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure);
+ .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure).setNumberOfTasks(jobBean.numberOfTasks);
if (jobBean.jobCommandConfigMap != null) {
b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index cf7f5e6..7676dab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -345,24 +345,9 @@ public class JobRebalancer extends TaskRebalancer {
// maximum number of attempts or task is in ABORTED state.
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() ||
currState.equals(TaskPartitionState.TASK_ABORTED)) {
- // If the user does not require this task to succeed in order for the job to succeed,
- // then we don't have to fail the job right now
- boolean successOptional = false;
- String taskId = jobCtx.getTaskIdForPartition(pId);
- if (taskId != null) {
- TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
- if (taskConfig != null) {
- successOptional = taskConfig.isSuccessOptional();
- }
- }
-
- // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+ // If we have some leeway for how many tasks we can fail, then we don't have
// to fail the job immediately
- if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
- successOptional = true;
- }
-
- if (!successOptional) {
+ if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index b990f99..621d371 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -36,6 +36,7 @@ public class TaskConfig {
private enum TaskConfigProperty {
TASK_ID,
TASK_COMMAND,
+ @Deprecated
TASK_SUCCESS_OPTIONAL,
TASK_TARGET_PARTITION
}
@@ -44,18 +45,26 @@ public class TaskConfig {
private final Map<String, String> _configMap;
+ @Deprecated
+ public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
+ String id, String target) {
+ this(command, configMap, id, target);
+ }
+
+ @Deprecated
+ public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
+ this(command, configMap, null, null);
+ }
+
/**
* Instantiate the task config
*
* @param command the command to invoke for the task
* @param configMap configuration to be passed as part of the invocation
- * @param successOptional true if this task need not pass for the job to succeed, false
- * otherwise
* @param id existing task ID
* @param target target partition for a task
*/
- public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
- String id, String target) {
+ public TaskConfig(String command, Map<String, String> configMap, String id, String target) {
if (configMap == null) {
configMap = Maps.newHashMap();
}
@@ -65,8 +74,6 @@ public class TaskConfig {
if (command != null) {
configMap.put(TaskConfigProperty.TASK_COMMAND.name(), command);
}
- configMap
- .put(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name(), Boolean.toString(successOptional));
configMap.put(TaskConfigProperty.TASK_ID.name(), id);
if (target != null) {
configMap.put(TaskConfigProperty.TASK_TARGET_PARTITION.name(), target);
@@ -79,11 +86,9 @@ public class TaskConfig {
*
* @param command the command to invoke for the task
* @param configMap configuration to be passed as part of the invocation
- * @param successOptional true if this task need not pass for the job to succeed, false
- * otherwise
*/
- public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
- this(command, configMap, successOptional, null, null);
+ public TaskConfig(String command, Map<String, String> configMap) {
+ this(command, configMap, null, null);
}
/**
@@ -115,16 +120,13 @@ public class TaskConfig {
/**
* Check if this task must succeed for a job to succeed
- *
+ * This field has been ignored by Helix
* @return true if success is optional, false otherwise
*/
+ @Deprecated
public boolean isSuccessOptional() {
- String successOptionalStr = _configMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
- if (successOptionalStr == null) {
- return false;
- } else {
- return Boolean.parseBoolean(successOptionalStr);
- }
+ // This option will not be used in rebalancer anymore, deprecate it.
+ return true;
}
/**
@@ -154,7 +156,7 @@ public class TaskConfig {
private Map<String, String> _configMap;
public TaskConfig build() {
- return new TaskConfig(_command, _configMap, _successOptional, _taskId, _targetPartition);
+ return new TaskConfig(_command, _configMap, _taskId, _targetPartition);
}
public String getTaskId() {
@@ -184,10 +186,12 @@ public class TaskConfig {
return this;
}
+ @Deprecated
public boolean isSuccessOptional() {
return _successOptional;
}
+ @Deprecated
public Builder setSuccessOptional(boolean successOptional) {
_successOptional = successOptional;
return this;
@@ -208,7 +212,7 @@ public class TaskConfig {
* @return instantiated TaskConfig
*/
public static TaskConfig from(String target) {
- return new TaskConfig(null, null, false, null, target);
+ return new TaskConfig(null, null, null, target);
}
/**
@@ -218,7 +222,7 @@ public class TaskConfig {
* @return instantiated TaskConfig
*/
public static TaskConfig from(TaskBean bean) {
- return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
+ return new TaskConfig(bean.command, bean.taskConfigMap);
}
/**
@@ -232,11 +236,7 @@ public class TaskConfig {
String taskId = rawConfigMap.get(TaskConfigProperty.TASK_ID.name());
String command = rawConfigMap.get(TaskConfigProperty.TASK_COMMAND.name());
String targetPartition = rawConfigMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
- String successOptionalStr =
- rawConfigMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
- boolean successOptional =
- (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : false;
- return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+ return new TaskConfig(command, rawConfigMap, taskId, targetPartition);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index dd7ebab..9a376f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -41,9 +41,9 @@ public class JobBean {
public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
- public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
+ public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index 97ecfc0..a61556b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -29,5 +29,6 @@ import java.util.Map;
public class TaskBean {
public String command;
public Map<String, String> taskConfigMap;
+ @Deprecated
public boolean successOptional = false;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index 0410db2..5645009 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -104,7 +104,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
_driver = new TaskDriver(_manager);
Map<String, String> taskConfigMap = Maps.newHashMap();
- _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false);
+ _taskConfig = new TaskConfig("TaskOne", taskConfigMap);
_jobCommandMap = Maps.newHashMap();
}
@@ -136,7 +136,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(20);
for (int i = 0; i < 50; i++) {
Map<String, String> taskConfigMap = Maps.newHashMap();
- taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false));
+ taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap));
}
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap)
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index c4d588c..64b9073 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -136,8 +136,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", null, true);
- TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
Map<String, String> jobCommandMap = Maps.newHashMap();
@@ -164,8 +164,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
- TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
Map<String, String> jobConfigMap = Maps.newHashMap();
@@ -185,35 +185,6 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test public void testOptionalTaskFailure() throws Exception {
- // Create a job with two different tasks
- String jobName = TestHelper.getTestMethodName();
- Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
- List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, true);
- TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
- taskConfigs.add(taskConfig1);
- taskConfigs.add(taskConfig2);
- Map<String, String> jobCommandMap = Maps.newHashMap();
- jobCommandMap.put("Timeout", "1000");
-
- JobConfig.Builder jobBuilder =
- new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
- .setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJob(jobName, jobBuilder);
-
- _driver.start(workflowBuilder.build());
-
- // Ensure the job completes
- _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
- _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
-
- // Ensure that each class was invoked
- Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
- Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
- }
-
@Test public void testReassignment() throws Exception {
final int NUM_INSTANCES = 5;
String jobName = TestHelper.getTestMethodName();
@@ -221,13 +192,13 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap
.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
jobCommandMap.put("Timeout", "1000");
JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
- .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+ .addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
workflowBuilder.addJob(jobName, jobBuilder);
@@ -254,7 +225,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
Map<String, String> taskConfigMap = Maps.newHashMap();
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
jobCommandMap.put("Timeout", "1000");
@@ -289,7 +260,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
Map<String, String> taskConfigMap = Maps.newHashMap();
- TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
+ TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/helix/blob/9fc6c540/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
index b2b27ef..13cd531 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
@@ -123,7 +123,7 @@ public class TestUserContentStore extends TaskTestBase {
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
Map<String, String> taskConfigMap = Maps.newHashMap();
- TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap, false);
+ TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
jobCommandMap.put("Timeout", "1000");
@@ -148,8 +148,8 @@ public class TestUserContentStore extends TaskTestBase {
List<TaskConfig> taskConfigs2 = Lists.newArrayListWithCapacity(1);
Map<String, String> taskConfigMap1 = Maps.newHashMap();
Map<String, String> taskConfigMap2 = Maps.newHashMap();
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1, false);
- TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2, false);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2);
taskConfigs1.add(taskConfig1);
taskConfigs2.add(taskConfig2);