You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/12/10 08:04:55 UTC
[3/3] helix git commit: [HELIX-617] Job IdealState is generated even
the job is not running and not removed when it is completed.
[HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1798e793
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1798e793
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1798e793
Branch: refs/heads/helix-0.6.x
Commit: 1798e793522157b1b479a66c8a9ec9453d698b8f
Parents: 7bbb20b
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Dec 9 14:02:45 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Dec 9 16:39:37 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/model/IdealState.java | 8 +-
.../FixedTargetTaskAssignmentCalculator.java | 163 +++
.../helix/task/FixedTargetTaskRebalancer.java | 163 ---
.../task/GenericTaskAssignmentCalculator.java | 273 +++++
.../helix/task/GenericTaskRebalancer.java | 273 -----
.../org/apache/helix/task/JobRebalancer.java | 650 +++++++++++
.../helix/task/TaskAssignmentCalculator.java | 45 +
.../java/org/apache/helix/task/TaskDriver.java | 205 ++--
.../org/apache/helix/task/TaskRebalancer.java | 1045 +++---------------
.../java/org/apache/helix/task/TaskUtil.java | 216 ++--
.../java/org/apache/helix/task/Workflow.java | 6 +-
.../apache/helix/task/WorkflowRebalancer.java | 412 +++++++
.../integration/task/TestRecurringJobQueue.java | 78 +-
.../integration/task/TestTaskRebalancer.java | 2 +-
14 files changed, 1971 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 696de7a..e7f6096 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -31,9 +31,9 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.task.FixedTargetTaskRebalancer;
-import org.apache.helix.task.GenericTaskRebalancer;
+import org.apache.helix.task.JobRebalancer;
import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowRebalancer;
import org.apache.log4j.Logger;
/**
@@ -524,8 +524,8 @@ public class IdealState extends HelixProperty {
default:
String rebalancerName = getRebalancerClassName();
if (rebalancerName != null) {
- if (rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())
- || rebalancerName.equals(GenericTaskRebalancer.class.getName())) {
+ if (rebalancerName.equals(JobRebalancer.class.getName())
+ || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
property = RebalanceMode.TASK;
} else {
property = RebalanceMode.USER_DEFINED;
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
new file mode 100644
index 0000000..8760524
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -0,0 +1,163 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are co-located according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
+
+ @Override
+ public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+ }
+
+ @Override
+ public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+ ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
+ JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Set<Integer> partitionSet, ClusterDataCache cache) {
+ IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+ if (tgtIs == null) {
+ return Collections.emptyMap();
+ }
+ Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+ return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
+ jobContext);
+ }
+
+ /**
+ * Gets the ideal state of the target resource of this job
+ * @param jobCfg job config containing target resource id
+ * @param cache snapshot of the cluster containing the task and target resource
+ * @return target resource ideal state, or null
+ */
+ private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+ String tgtResourceId = jobCfg.getTargetResource();
+ return cache.getIdealState(tgtResourceId);
+ }
+
+ /**
+ * Returns the set of all partition ids for a job.
+ * <p/>
+ * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+ * use the list of all partition ids from the target resource.
+ */
+ private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+ JobContext taskCtx) {
+ if (tgtResourceIs == null) {
+ return null;
+ }
+ Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+ SortedSet<String> targetPartitions = Sets.newTreeSet();
+ if (jobCfg.getTargetPartitions() != null) {
+ targetPartitions.addAll(jobCfg.getTargetPartitions());
+ } else {
+ targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+ }
+
+ Set<Integer> taskPartitions = Sets.newTreeSet();
+ for (String pName : targetPartitions) {
+ taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+ }
+ return taskPartitions;
+ }
+
+ private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+ Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+ if (!currentTargets.containsKey(targetPartition)) {
+ int nextId = jobCtx.getPartitionSet().size();
+ jobCtx.setPartitionTarget(nextId, targetPartition);
+ return Lists.newArrayList(nextId);
+ } else {
+ return currentTargets.get(targetPartition);
+ }
+ }
+
+ /**
+ * Get partition assignments for the target resource, but only for the partitions of interest.
+ * @param currStateOutput The current state of the instances in the cluster.
+ * @param instances The instances.
+ * @param tgtIs The ideal state of the target resource.
+ * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+ * do not need to
+ * be in any specific state to be considered.
+ * @param includeSet The set of partitions to consider.
+ * @return A map of instance vs set of partition ids assigned to that instance.
+ */
+ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+ CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
+ Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+ Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+ for (String instance : instances) {
+ result.put(instance, new TreeSet<Integer>());
+ }
+
+ Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+ for (String pName : tgtIs.getPartitionSet()) {
+ List<Integer> partitions = partitionsByTarget.get(pName);
+ if (partitions == null || partitions.size() < 1) {
+ continue;
+ }
+ int pId = partitions.get(0);
+ if (includeSet.contains(pId)) {
+ for (String instance : instances) {
+ Message pendingMessage =
+ currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName),
+ instance);
+ if (pendingMessage != null) {
+ continue;
+ }
+ String s =
+ currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
+ instance);
+ String state = (s == null ? null : s.toString());
+ if (tgtStates == null || tgtStates.contains(state)) {
+ result.get(instance).add(pId);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
deleted file mode 100644
index 4c013c0..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * A rebalancer for when a task group must be assigned according to partitions/states on a target
- * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
- * (if desired) only where those partitions are in a given state.
- */
-public class FixedTargetTaskRebalancer extends TaskRebalancer {
-
- @Override
- public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
- return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
- }
-
- @Override
- public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
- ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
- JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
- IdealState tgtIs = getTgtIdealState(jobCfg, cache);
- if (tgtIs == null) {
- return Collections.emptyMap();
- }
- Set<String> tgtStates = jobCfg.getTargetPartitionStates();
- return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
- jobContext);
- }
-
- /**
- * Gets the ideal state of the target resource of this job
- * @param jobCfg job config containing target resource id
- * @param cluster snapshot of the cluster containing the task and target resource
- * @return target resource ideal state, or null
- */
- private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
- String tgtResourceId = jobCfg.getTargetResource();
- return cache.getIdealState(tgtResourceId);
- }
-
- /**
- * Returns the set of all partition ids for a job.
- * <p/>
- * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
- * use the list of all partition ids from the target resource.
- */
- private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
- JobContext taskCtx) {
- if (tgtResourceIs == null) {
- return null;
- }
- Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
- SortedSet<String> targetPartitions = Sets.newTreeSet();
- if (jobCfg.getTargetPartitions() != null) {
- targetPartitions.addAll(jobCfg.getTargetPartitions());
- } else {
- targetPartitions.addAll(tgtResourceIs.getPartitionSet());
- }
-
- Set<Integer> taskPartitions = Sets.newTreeSet();
- for (String pName : targetPartitions) {
- taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
- }
- return taskPartitions;
- }
-
- private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
- Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
- if (!currentTargets.containsKey(targetPartition)) {
- int nextId = jobCtx.getPartitionSet().size();
- jobCtx.setPartitionTarget(nextId, targetPartition);
- return Lists.newArrayList(nextId);
- } else {
- return currentTargets.get(targetPartition);
- }
- }
-
- /**
- * Get partition assignments for the target resource, but only for the partitions of interest.
- * @param currStateOutput The current state of the instances in the cluster.
- * @param instances The instances.
- * @param tgtIs The ideal state of the target resource.
- * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
- * do not need to
- * be in any specific state to be considered.
- * @param includeSet The set of partitions to consider.
- * @return A map of instance vs set of partition ids assigned to that instance.
- */
- private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
- CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
- Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
- Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instances) {
- result.put(instance, new TreeSet<Integer>());
- }
-
- Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
- for (String pName : tgtIs.getPartitionSet()) {
- List<Integer> partitions = partitionsByTarget.get(pName);
- if (partitions == null || partitions.size() < 1) {
- continue;
- }
- int pId = partitions.get(0);
- if (includeSet.contains(pId)) {
- for (String instance : instances) {
- Message pendingMessage =
- currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName),
- instance);
- if (pendingMessage != null) {
- continue;
- }
- String s =
- currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
- instance);
- String state = (s == null ? null : s.toString());
- if (tgtStates == null || tgtStates.contains(state)) {
- result.get(instance).add(pId);
- }
- }
- }
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
new file mode 100644
index 0000000..e8d5f5d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -0,0 +1,273 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
+ /** Reassignment policy for this algorithm */
+ private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
+
+ @Override
+ public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+ Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+ for (TaskConfig taskCfg : taskMap.values()) {
+ String taskId = taskCfg.getId();
+ int nextPartition = jobCtx.getPartitionSet().size();
+ if (!taskIdMap.containsKey(taskId)) {
+ jobCtx.setTaskIdForPartition(nextPartition, taskId);
+ }
+ }
+ return jobCtx.getPartitionSet();
+ }
+
+ @Override
+ public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+ ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
+ final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Set<Integer> partitionSet, ClusterDataCache cache) {
+ // Gather input to the full auto rebalancing algorithm
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+ states.put("ONLINE", 1);
+
+ // Only map partitions whose assignment we care about
+ final Set<TaskPartitionState> honoredStates =
+ Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+ TaskPartitionState.STOPPED);
+ Set<Integer> filteredPartitionSet = Sets.newHashSet();
+ for (Integer p : partitionSet) {
+ TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+ if (state == null || honoredStates.contains(state)) {
+ filteredPartitionSet.add(p);
+ }
+ }
+
+ // Transform from partition id to fully qualified partition name
+ List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+ Collections.sort(partitionNums);
+ final String resourceId = prevAssignment.getResourceName();
+ List<String> partitions =
+ new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
+ @Override
+ public String apply(Integer partitionNum) {
+ return resourceId + "_" + partitionNum;
+ }
+ }));
+
+ // Compute the current assignment
+ Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+ for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+ if (!filteredPartitionSet.contains(TaskUtil.getPartitionId(partition.getPartitionName()))) {
+ // not computing old partitions
+ continue;
+ }
+ Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
+ if (prevAssignment != null) {
+ allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+ }
+ allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+ allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+ currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
+ }
+
+ // Get the assignment keyed on partition
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+ new AutoRebalanceStrategy.DefaultPlacementScheme());
+ List<String> allNodes =
+ Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
+ Collections.sort(allNodes);
+ ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+ Map<String, List<String>> preferenceLists = record.getListFields();
+
+ // Convert to an assignment keyed on participant
+ Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+ for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+ String partitionName = e.getKey();
+ partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName));
+ List<String> preferenceList = e.getValue();
+ for (String participantName : preferenceList) {
+ if (!taskAssignment.containsKey(participantName)) {
+ taskAssignment.put(participantName, new TreeSet<Integer>());
+ }
+ taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+ }
+ }
+
+ // Finally, adjust the assignment if tasks have been failing
+ taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
+ return taskAssignment;
+ }
+
+ /**
+ * Filter a list of instances based on targeted resource policies
+ * @param jobCfg the job configuration
+ * @param currStateOutput the current state of all instances in the cluster
+ * @param instances valid instances
+ * @param cache current snapshot of the cluster
+ * @return a set of instances that can be assigned to
+ */
+ private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
+ Iterable<String> instances, ClusterDataCache cache) {
+ // No target resource means any instance is available
+ Set<String> allInstances = Sets.newHashSet(instances);
+ String targetResource = jobCfg.getTargetResource();
+ if (targetResource == null) {
+ return allInstances;
+ }
+
+ // Bad ideal state means don't assign
+ IdealState idealState = cache.getIdealState(targetResource);
+ if (idealState == null) {
+ return Collections.emptySet();
+ }
+
+ // Get the partitions on the target resource to use
+ Set<String> partitions = idealState.getPartitionSet();
+ List<String> targetPartitions = jobCfg.getTargetPartitions();
+ if (targetPartitions != null && !targetPartitions.isEmpty()) {
+ partitions.retainAll(targetPartitions);
+ }
+
+ // Based on state matches, add eligible instances
+ Set<String> eligibleInstances = Sets.newHashSet();
+ Set<String> targetStates = jobCfg.getTargetPartitionStates();
+ for (String partition : partitions) {
+ Map<String, String> stateMap =
+ currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
+ Map<String, String> pendingStateMap =
+ currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
+ for (Map.Entry<String, String> e : stateMap.entrySet()) {
+ String instanceName = e.getKey();
+ String state = e.getValue();
+ String pending = pendingStateMap.get(instanceName);
+ if (pending != null) {
+ continue;
+ }
+ if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
+ eligibleInstances.add(instanceName);
+ }
+ }
+ }
+ allInstances.retainAll(eligibleInstances);
+ return allInstances;
+ }
+
+ public interface RetryPolicy {
+ /**
+ * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
+ * assigned
+ * @param jobCfg the job configuration
+ * @param jobCtx the job context
+ * @param instances instances that can serve tasks
+ * @param origAssignment the unmodified assignment
+ * @return the adjusted assignment
+ */
+ Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
+ }
+
+ private static class DefaultRetryReassigner implements RetryPolicy {
+ @Override
+ public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
+ // Compute an increasing integer ID for each instance
+ BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
+ int instanceIndex = 0;
+ for (String instance : instances) {
+ instanceMap.put(instance, instanceIndex++);
+ }
+
+ // Move partitions
+ Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
+ for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
+ String instance = e.getKey();
+ SortedSet<Integer> partitions = e.getValue();
+ Integer instanceId = instanceMap.get(instance);
+ if (instanceId != null) {
+ for (int p : partitions) {
+ // Determine for each partition if there have been failures with the current assignment
+ // strategy, and if so, force a shift in assignment for that partition only
+ int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
+ int newInstanceId = (instanceId + shiftValue) % instances.size();
+ String newInstance = instanceMap.inverse().get(newInstanceId);
+ if (newInstance == null) {
+ newInstance = instance;
+ }
+ if (!newAssignment.containsKey(newInstance)) {
+ newAssignment.put(newInstance, new TreeSet<Integer>());
+ }
+ newAssignment.get(newInstance).add(p);
+ }
+ } else {
+ // In case something goes wrong, just keep the previous assignment
+ newAssignment.put(instance, partitions);
+ }
+ }
+ return newAssignment;
+ }
+
+ /**
+ * In case tasks fail, we may not want to schedule them in the same place. This method allows us
+ * to compute a shifting value so that we can systematically choose other instances to try
+ * @param jobCfg the job configuration
+ * @param jobCtx the job context
+ * @param instances instances that can be chosen
+ * @param p the partition to look up
+ * @return the shifting value
+ */
+ private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, int p) {
+ int numAttempts = jobCtx.getPartitionNumAttempts(p);
+ int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
+ int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
+ return numAttempts / (maxNumAttempts / numInstances);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
deleted file mode 100644
index f4145c5..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ /dev/null
@@ -1,273 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
- * assignment to target partitions and states of another resource
- */
-public class GenericTaskRebalancer extends TaskRebalancer {
- /** Reassignment policy for this algorithm */
- private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
-
- @Override
- public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
- Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
- Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
- for (TaskConfig taskCfg : taskMap.values()) {
- String taskId = taskCfg.getId();
- int nextPartition = jobCtx.getPartitionSet().size();
- if (!taskIdMap.containsKey(taskId)) {
- jobCtx.setTaskIdForPartition(nextPartition, taskId);
- }
- }
- return jobCtx.getPartitionSet();
- }
-
- @Override
- public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
- ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
- final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
- // Gather input to the full auto rebalancing algorithm
- LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
- states.put("ONLINE", 1);
-
- // Only map partitions whose assignment we care about
- final Set<TaskPartitionState> honoredStates =
- Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
- TaskPartitionState.STOPPED);
- Set<Integer> filteredPartitionSet = Sets.newHashSet();
- for (Integer p : partitionSet) {
- TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
- if (state == null || honoredStates.contains(state)) {
- filteredPartitionSet.add(p);
- }
- }
-
- // Transform from partition id to fully qualified partition name
- List<Integer> partitionNums = Lists.newArrayList(partitionSet);
- Collections.sort(partitionNums);
- final String resourceId = prevAssignment.getResourceName();
- List<String> partitions =
- new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
- @Override
- public String apply(Integer partitionNum) {
- return resourceId + "_" + partitionNum;
- }
- }));
-
- // Compute the current assignment
- Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
- for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
- if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) {
- // not computing old partitions
- continue;
- }
- Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
- if (prevAssignment != null) {
- allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
- }
- allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
- allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
- currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
- }
-
- // Get the assignment keyed on partition
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
- new AutoRebalanceStrategy.DefaultPlacementScheme());
- List<String> allNodes =
- Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
- Collections.sort(allNodes);
- ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
- Map<String, List<String>> preferenceLists = record.getListFields();
-
- // Convert to an assignment keyed on participant
- Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
- for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
- String partitionName = e.getKey();
- partitionName = String.valueOf(pId(partitionName));
- List<String> preferenceList = e.getValue();
- for (String participantName : preferenceList) {
- if (!taskAssignment.containsKey(participantName)) {
- taskAssignment.put(participantName, new TreeSet<Integer>());
- }
- taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
- }
- }
-
- // Finally, adjust the assignment if tasks have been failing
- taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
- return taskAssignment;
- }
-
- /**
- * Filter a list of instances based on targeted resource policies
- * @param jobCfg the job configuration
- * @param currStateOutput the current state of all instances in the cluster
- * @param instances valid instances
- * @param cache current snapshot of the cluster
- * @return a set of instances that can be assigned to
- */
- private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
- Iterable<String> instances, ClusterDataCache cache) {
- // No target resource means any instance is available
- Set<String> allInstances = Sets.newHashSet(instances);
- String targetResource = jobCfg.getTargetResource();
- if (targetResource == null) {
- return allInstances;
- }
-
- // Bad ideal state means don't assign
- IdealState idealState = cache.getIdealState(targetResource);
- if (idealState == null) {
- return Collections.emptySet();
- }
-
- // Get the partitions on the target resource to use
- Set<String> partitions = idealState.getPartitionSet();
- List<String> targetPartitions = jobCfg.getTargetPartitions();
- if (targetPartitions != null && !targetPartitions.isEmpty()) {
- partitions.retainAll(targetPartitions);
- }
-
- // Based on state matches, add eligible instances
- Set<String> eligibleInstances = Sets.newHashSet();
- Set<String> targetStates = jobCfg.getTargetPartitionStates();
- for (String partition : partitions) {
- Map<String, String> stateMap =
- currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
- Map<String, String> pendingStateMap =
- currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
- for (Map.Entry<String, String> e : stateMap.entrySet()) {
- String instanceName = e.getKey();
- String state = e.getValue();
- String pending = pendingStateMap.get(instanceName);
- if (pending != null) {
- continue;
- }
- if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
- eligibleInstances.add(instanceName);
- }
- }
- }
- allInstances.retainAll(eligibleInstances);
- return allInstances;
- }
-
- public interface RetryPolicy {
- /**
- * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
- * assigned
- * @param jobCfg the job configuration
- * @param jobCtx the job context
- * @param instances instances that can serve tasks
- * @param origAssignment the unmodified assignment
- * @return the adjusted assignment
- */
- Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
- }
-
- private static class DefaultRetryReassigner implements RetryPolicy {
- @Override
- public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
- // Compute an increasing integer ID for each instance
- BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
- int instanceIndex = 0;
- for (String instance : instances) {
- instanceMap.put(instance, instanceIndex++);
- }
-
- // Move partitions
- Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
- for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
- String instance = e.getKey();
- SortedSet<Integer> partitions = e.getValue();
- Integer instanceId = instanceMap.get(instance);
- if (instanceId != null) {
- for (int p : partitions) {
- // Determine for each partition if there have been failures with the current assignment
- // strategy, and if so, force a shift in assignment for that partition only
- int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
- int newInstanceId = (instanceId + shiftValue) % instances.size();
- String newInstance = instanceMap.inverse().get(newInstanceId);
- if (newInstance == null) {
- newInstance = instance;
- }
- if (!newAssignment.containsKey(newInstance)) {
- newAssignment.put(newInstance, new TreeSet<Integer>());
- }
- newAssignment.get(newInstance).add(p);
- }
- } else {
- // In case something goes wrong, just keep the previous assignment
- newAssignment.put(instance, partitions);
- }
- }
- return newAssignment;
- }
-
- /**
- * In case tasks fail, we may not want to schedule them in the same place. This method allows us
- * to compute a shifting value so that we can systematically choose other instances to try
- * @param jobCfg the job configuration
- * @param jobCtx the job context
- * @param instances instances that can be chosen
- * @param p the partition to look up
- * @return the shifting value
- */
- private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
- Collection<String> instances, int p) {
- int numAttempts = jobCtx.getPartitionNumAttempts(p);
- int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
- int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
- return numAttempts / (maxNumAttempts / numInstances);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
new file mode 100644
index 0000000..0e2ab15
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -0,0 +1,650 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.*;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.*;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Custom rebalancer implementation for the {@code Job} in task model.
+ */
+public class JobRebalancer extends TaskRebalancer {
+ private static final Logger LOG = Logger.getLogger(JobRebalancer.class);
+ private static TaskAssignmentCalculator fixTaskAssignmentCal =
+ new FixedTargetTaskAssignmentCalculator();
+ private static TaskAssignmentCalculator genericTaskAssignmentCal =
+ new GenericTaskAssignmentCalculator();
+
+ private static final String PREV_RA_NODE = "PreviousResourceAssignment";
+
+ @Override
+ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+ IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+ final String jobName = resource.getResourceName();
+ LOG.debug("Computer Best Partition for job: " + jobName);
+
+ // Fetch job configuration
+ JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+ if (jobCfg == null) {
+ LOG.error("Job configuration is NULL for " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+ String workflowResource = jobCfg.getWorkflow();
+
+ // Fetch workflow configuration and context
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ if (workflowCfg == null) {
+ LOG.error("Workflow configuration is NULL for " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+ if (workflowCtx == null) {
+ LOG.error("Workflow context is NULL for " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ TargetState targetState = workflowCfg.getTargetState();
+ if (targetState != TargetState.START && targetState != TargetState.STOP) {
+ LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
+ + ".Stop scheduling job " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ TaskState jobState = workflowCtx.getJobState(jobName);
+ // The job is already in a final state (completed/failed).
+ if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+ LOG.info("Job " + jobName + " is failed or already completed, clean up IS.");
+ TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+ _scheduledRebalancer.removeScheduledRebalance(jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ if (!isWorkflowReadyForSchedule(workflowCfg)) {
+ LOG.info("Job is not ready to be scheduled since workflow is not ready " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) {
+ LOG.info("Job is not ready to be scheduled " + jobName);
+ return buildEmptyAssignment(jobName, currStateOutput);
+ }
+
+ // Fetch any existing context information from the property store.
+ JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName);
+ if (jobCtx == null) {
+ jobCtx = new JobContext(new ZNRecord("TaskContext"));
+ jobCtx.setStartTime(System.currentTimeMillis());
+ }
+
+ // Grab the old assignment, or an empty one if it doesn't exist
+ ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
+ if (prevAssignment == null) {
+ prevAssignment = new ResourceAssignment(jobName);
+ }
+
+ // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+ // is stored in zk.
+ // Fetch the previous resource assignment from the property store. This is required because of
+ // HELIX-230.
+ Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+ ResourceAssignment newAssignment =
+ computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, clusterData
+ .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
+ clusterData);
+
+ if (!partitionsToDrop.isEmpty()) {
+ for (Integer pId : partitionsToDrop) {
+ taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
+ }
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
+ accessor.setProperty(propertyKey, taskIs);
+ }
+
+ // Update rebalancer context, previous ideal state.
+ TaskUtil.setJobContext(_manager, jobName, jobCtx);
+ TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+ setPrevResourceAssignment(jobName, newAssignment);
+
+ LOG.debug("Job " + jobName + " new assignment " + Arrays
+ .toString(newAssignment.getMappedPartitions().toArray()));
+
+ return newAssignment;
+ }
+
+ private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
+ WorkflowConfig workflowCfg) {
+ Set<String> ret = new HashSet<String>();
+ for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+ if (jobName.equals(currentJobName)) {
+ continue;
+ }
+ JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
+ if (jobContext == null) {
+ continue;
+ }
+ for (int partition : jobContext.getPartitionSet()) {
+ TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+ if (partitionState == TaskPartitionState.INIT ||
+ partitionState == TaskPartitionState.RUNNING) {
+ ret.add(jobContext.getAssignedParticipant(partition));
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private ResourceAssignment computeResourceMapping(String jobResource,
+ WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+ Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+ WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+ ClusterDataCache cache) {
+ TargetState jobTgtState = workflowConfig.getTargetState();
+ // Update running status in workflow context
+ if (jobTgtState == TargetState.STOP) {
+ workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+ // Workflow has been stopped if all in progress jobs are stopped
+ if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+ workflowCtx.setWorkflowState(TaskState.STOPPED);
+ }
+ } else {
+ workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
+ // Workflow is in progress if any task is in progress
+ workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+ }
+
+ // Used to keep track of tasks that have already been assigned to instances.
+ Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+ // Used to keep track of tasks that have failed, but whose failure is acceptable
+ Set<Integer> skippedPartitions = new HashSet<Integer>();
+
+ // Keeps a mapping of (partition) -> (instance, state)
+ Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+ Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
+
+ // Process all the current assignments of tasks.
+ TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
+ Set<Integer> allPartitions =
+ taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+ Map<String, SortedSet<Integer>> taskAssignments =
+ getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+ long currentTime = System.currentTimeMillis();
+ for (String instance : taskAssignments.keySet()) {
+ if (excludedInstances.contains(instance)) {
+ continue;
+ }
+
+ Set<Integer> pSet = taskAssignments.get(instance);
+ // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+ // TASK_ERROR, ERROR.
+ Set<Integer> donePartitions = new TreeSet<Integer>();
+ for (int pId : pSet) {
+ final String pName = pName(jobResource, pId);
+
+ // Check for pending state transitions on this (partition, instance).
+ Message pendingMessage =
+ currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+ if (pendingMessage != null) {
+ // There is a pending state transition for this (partition, instance). Just copy forward
+ // the state assignment from the previous ideal state.
+ Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
+ if (stateMap != null) {
+ String prevState = stateMap.get(instance);
+ paMap.put(pId, new PartitionAssignment(instance, prevState));
+ assignedPartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+ pName, instance, prevState));
+ }
+ }
+
+ continue;
+ }
+
+ TaskPartitionState currState =
+ TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
+ pName), instance));
+ jobCtx.setPartitionState(pId, currState);
+
+ // Process any requested state transitions.
+ String requestedStateStr =
+ currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
+ if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+ TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+ if (requestedState.equals(currState)) {
+ LOG.warn(String.format(
+ "Requested state %s is the same as the current state for instance %s.",
+ requestedState, instance));
+ }
+
+ paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+ assignedPartitions.add(pId);
+ LOG.debug(String.format(
+ "Instance %s requested a state transition to %s for partition %s.", instance,
+ requestedState, pName));
+ continue;
+ }
+
+ switch (currState) {
+ case RUNNING:
+ case STOPPED: {
+ TaskPartitionState nextState;
+ if (jobTgtState == TargetState.START) {
+ nextState = TaskPartitionState.RUNNING;
+ } else {
+ nextState = TaskPartitionState.STOPPED;
+ }
+
+ paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+ assignedPartitions.add(pId);
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+ nextState, instance));
+ }
+ break;
+ case COMPLETED: {
+ // The task has completed on this partition. Mark as such in the context object.
+ donePartitions.add(pId);
+ LOG.debug(String
+ .format(
+ "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ partitionsToDropFromIs.add(pId);
+ markPartitionCompleted(jobCtx, pId);
+ }
+ break;
+ case TIMED_OUT:
+ case TASK_ERROR:
+ case ERROR: {
+ donePartitions.add(pId); // The task may be rescheduled on a different instance.
+ LOG.debug(String.format(
+ "Task partition %s has error state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ markPartitionError(jobCtx, pId, currState, true);
+ // The error policy is to fail the task as soon a single partition fails for a specified
+ // maximum number of attempts.
+ if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+ // If the user does not require this task to succeed in order for the job to succeed,
+ // then we don't have to fail the job right now
+ boolean successOptional = false;
+ String taskId = jobCtx.getTaskIdForPartition(pId);
+ if (taskId != null) {
+ TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+ if (taskConfig != null) {
+ successOptional = taskConfig.isSuccessOptional();
+ }
+ }
+
+ // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+ // to fail the job immediately
+ if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
+ successOptional = true;
+ }
+
+ if (!successOptional) {
+ long finishTime = currentTime;
+ workflowCtx.setJobState(jobResource, TaskState.FAILED);
+ if (workflowConfig.isTerminable()) {
+ workflowCtx.setWorkflowState(TaskState.FAILED);
+ workflowCtx.setFinishTime(finishTime);
+ }
+ jobCtx.setFinishTime(finishTime);
+ markAllPartitionsError(jobCtx, currState, false);
+ addAllPartitions(allPartitions, partitionsToDropFromIs);
+
+ // remove IdealState of this job
+ TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ return buildEmptyAssignment(jobResource, currStateOutput);
+ } else {
+ skippedPartitions.add(pId);
+ partitionsToDropFromIs.add(pId);
+ }
+ } else {
+ // Mark the task to be started at some later time (if enabled)
+ markPartitionDelayed(jobCfg, jobCtx, pId);
+ }
+ }
+ break;
+ case INIT:
+ case DROPPED: {
+ // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+ donePartitions.add(pId);
+ LOG.debug(String.format(
+ "Task partition %s has state %s. It will be dropped from the current ideal state.",
+ pName, currState));
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown enum symbol: " + currState);
+ }
+ }
+
+ // Remove the set of task partitions that are completed or in one of the error states.
+ pSet.removeAll(donePartitions);
+ }
+
+ // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+ scheduleForNextTask(jobResource, jobCtx, currentTime);
+
+ if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
+ workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
+ jobCtx.setFinishTime(currentTime);
+ if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+ workflowCtx.setWorkflowState(TaskState.COMPLETED);
+ workflowCtx.setFinishTime(currentTime);
+ }
+
+ // remove IdealState of this job
+ TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ }
+
+ // Make additional task assignments if needed.
+ if (jobTgtState == TargetState.START) {
+ // Contains the set of task partitions that must be excluded from consideration when making
+ // any new assignments.
+ // This includes all completed, failed, delayed, and already assigned partitions.
+ Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+ addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+ addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+ excludeSet.addAll(skippedPartitions);
+ excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
+ // Get instance->[partition, ...] mappings for the target resource.
+ Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
+ .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+ workflowConfig, workflowCtx, allPartitions, cache);
+ for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+ String instance = entry.getKey();
+ if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
+ .contains(instance)) {
+ continue;
+ }
+ // Contains the set of task partitions currently assigned to the instance.
+ Set<Integer> pSet = entry.getValue();
+ int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+ if (numToAssign > 0) {
+ List<Integer> nextPartitions =
+ getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+ for (Integer pId : nextPartitions) {
+ String pName = pName(jobResource, pId);
+ paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+ excludeSet.add(pId);
+ jobCtx.setAssignedParticipant(pId, instance);
+ jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+ TaskPartitionState.RUNNING, instance));
+ }
+ }
+ }
+ }
+
+ // Construct a ResourceAssignment object from the map of partition assignments.
+ ResourceAssignment ra = new ResourceAssignment(jobResource);
+ for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+ PartitionAssignment pa = e.getValue();
+ ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
+ ImmutableMap.of(pa._instance, pa._state));
+ }
+
+ return ra;
+ }
+
+ private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
+ // Clear current entries if they exist and are expired
+ long currentTime = now;
+ long scheduledTime = _scheduledRebalancer.getRebalanceTime(job);
+ if (scheduledTime > 0 && currentTime > scheduledTime) {
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+
+ // Figure out the earliest schedulable time in the future of a non-complete job
+ boolean shouldSchedule = false;
+ long earliestTime = Long.MAX_VALUE;
+ for (int p : jobCtx.getPartitionSet()) {
+ long retryTime = jobCtx.getNextRetryTime(p);
+ TaskPartitionState state = jobCtx.getPartitionState(p);
+ state = (state != null) ? state : TaskPartitionState.INIT;
+ Set<TaskPartitionState> errorStates =
+ Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
+ TaskPartitionState.TIMED_OUT);
+ if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
+ earliestTime = retryTime;
+ shouldSchedule = true;
+ }
+ }
+
+ // If any was found, then schedule it
+ if (shouldSchedule) {
+ _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime);
+ }
+ }
+
+ /**
+ * Get the last task assignment for a given job
+ *
+ * @param resourceName the name of the job
+ * @return {@link ResourceAssignment} instance, or null if no assignment is available
+ */
+ private ResourceAssignment getPrevResourceAssignment(String resourceName) {
+ ZNRecord r = _manager.getHelixPropertyStore()
+ .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+ null, AccessOption.PERSISTENT);
+ return r != null ? new ResourceAssignment(r) : null;
+ }
+
+ /**
+ * Set the last task assignment for a given job
+ *
+ * @param resourceName the name of the job
+ * @param ra {@link ResourceAssignment} containing the task assignment
+ */
+ private void setPrevResourceAssignment(String resourceName,
+ ResourceAssignment ra) {
+ _manager.getHelixPropertyStore()
+ .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+ ra.getRecord(), AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Checks if the job has completed.
+ * @param ctx The rebalancer context.
+ * @param allPartitions The set of partitions to check.
+ * @param skippedPartitions partitions that failed, but whose failure is acceptable
+ * @return true if all task partitions have been marked with status
+ * {@link TaskPartitionState#COMPLETED} in the rebalancer
+ * context, false otherwise.
+ */
+ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
+ Set<Integer> skippedPartitions, JobConfig cfg) {
+ for (Integer pId : allPartitions) {
+ TaskPartitionState state = ctx.getPartitionState(pId);
+ if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
+ && !isTaskGivenup(ctx, cfg, pId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+ for (Integer pId : toAdd) {
+ destination.add(pId);
+ }
+ }
+
+ private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
+ Iterable<Integer> pIds) {
+ for (Integer pId : pIds) {
+ TaskPartitionState state = ctx.getPartitionState(pId);
+ if (state == TaskPartitionState.COMPLETED) {
+ set.add(pId);
+ }
+ }
+ }
+
+ private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
+ return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
+ }
+
+ // add all partitions that have been tried maxNumberAttempts
+ private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
+ JobConfig cfg) {
+ for (Integer pId : pIds) {
+ if (isTaskGivenup(ctx, cfg, pId)) {
+ set.add(pId);
+ }
+ }
+ }
+
+ private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+ Set<Integer> excluded, int n) {
+ List<Integer> result = new ArrayList<Integer>();
+ for (Integer pId : candidatePartitions) {
+ if (result.size() >= n) {
+ break;
+ }
+
+ if (!excluded.contains(pId)) {
+ result.add(pId);
+ }
+ }
+
+ return result;
+ }
+
+ private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
+ long delayInterval = cfg.getTaskRetryDelay();
+ if (delayInterval <= 0) {
+ return;
+ }
+ long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+ ctx.setNextRetryTime(p, nextStartTime);
+ }
+
+ private static void markPartitionCompleted(JobContext ctx, int pId) {
+ ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+ ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+ ctx.incrementNumAttempts(pId);
+ }
+
+ private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+ boolean incrementAttempts) {
+ ctx.setPartitionState(pId, state);
+ ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+ if (incrementAttempts) {
+ ctx.incrementNumAttempts(pId);
+ }
+ }
+
+ private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+ boolean incrementAttempts) {
+ for (int pId : ctx.getPartitionSet()) {
+ markPartitionError(ctx, pId, state, incrementAttempts);
+ }
+ }
+
+ /**
+ * Return the assignment of task partitions per instance.
+ */
+ private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+ Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+ Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+ for (String instance : instanceList) {
+ result.put(instance, new TreeSet<Integer>());
+ }
+
+ for (Partition partition : assignment.getMappedPartitions()) {
+ int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+ if (includeSet.contains(pId)) {
+ Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+ for (String instance : replicaMap.keySet()) {
+ SortedSet<Integer> pList = result.get(instance);
+ if (pList != null) {
+ pList.add(pId);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+ Set<Integer> nonReadyPartitions = Sets.newHashSet();
+ for (int p : ctx.getPartitionSet()) {
+ long toStart = ctx.getNextRetryTime(p);
+ if (now < toStart) {
+ nonReadyPartitions.add(p);
+ }
+ }
+ return nonReadyPartitions;
+ }
+
+ private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
+ Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+ if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
+ return genericTaskAssignmentCal;
+ } else {
+ return fixTaskAssignmentCal;
+ }
+ }
+
+ /**
+ * Computes the partition name given the resource name and partition id.
+ */
+ private String pName(String resource, int pId) {
+ return resource + "_" + pId;
+ }
+
+ /**
+ * An (instance, state) pair.
+ */
+ private static class PartitionAssignment {
+ private final String _instance;
+ private final String _state;
+
+ private PartitionAssignment(String instance, String state) {
+ _instance = instance;
+ _state = state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
new file mode 100644
index 0000000..a3ed5ab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -0,0 +1,45 @@
+package org.apache.helix.task;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
+public abstract class TaskAssignmentCalculator {
+ /**
+ * Get all the partitions that should be created by this task
+ *
+ * @param jobCfg the task configuration
+ * @param jobCtx the task context
+ * @param workflowCfg the workflow configuration
+ * @param workflowCtx the workflow context
+ * @param cache cluster snapshot
+ * @return set of partition numbers
+ */
+ public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+
+ /**
+ * Compute an assignment of tasks to instances
+ *
+ * @param currStateOutput the current state of the instances
+ * @param prevAssignment the previous task partition assignment
+ * @param instances the instances
+ * @param jobCfg the task configuration
+ * @param jobContext the task context
+ * @param workflowCfg the workflow configuration
+ * @param workflowCtx the workflow context
+ * @param partitionSet the partitions to assign
+ * @param cache cluster snapshot
+ * @return map of instances to set of partition numbers
+ */
+ public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+ CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+ Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+ ClusterDataCache cache);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 654ba4e..9b64aec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -188,20 +188,21 @@ public class TaskDriver {
LOG.info("Starting workflow " + flow.getName());
flow.validate();
- String flowName = flow.getName();
-
- // first, add workflow config to ZK
- _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+ // first, add workflow config.
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flow.getName()),
flow.getWorkflowConfig().getResourceConfigMap());
- // then schedule jobs
+ // then add all job configs.
for (String job : flow.getJobConfigs().keySet()) {
- JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
+ JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
- builder.addTaskConfigs(flow.getTaskConfigs().get(job));
+ jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job));
}
- scheduleJob(job, builder.build());
+ addJobConfig(job, jobCfgBuilder.build());
}
+
+ // Finally add workflow resource.
+ addWorkflowResource(flow.getName());
}
/** Creates a new named job queue (workflow) */
@@ -210,6 +211,7 @@ public class TaskDriver {
}
/** Flushes a named job queue */
+ // TODO: need to make sure the queue is stopped or completed before flush the queue.
public void flushQueue(String queueName) throws Exception {
WorkflowConfig config =
TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -351,54 +353,57 @@ public class TaskDriver {
_propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
}
- /** Remove the job name from the DAG from the queue configuration */
+ /**
+ * Remove the job name from the DAG from the queue configuration
+ */
private void removeJobFromDag(final String queueName, final String jobName) {
final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+ DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
+ return null;
+ }
// Add the node to the existing DAG
JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
Set<String> allNodes = jobDag.getAllNodes();
if (!allNodes.contains(namespacedJobName)) {
- LOG.warn("Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
- } else {
- String parent = null;
- String child = null;
- // remove the node from the queue
- for (String node : allNodes) {
- if (!node.equals(namespacedJobName)) {
- if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
- parent = node;
- jobDag.removeParentToChild(parent, namespacedJobName);
- } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
- child = node;
- jobDag.removeParentToChild(namespacedJobName, child);
- }
- }
- }
-
- if (parent != null && child != null) {
- jobDag.addParentToChild(parent, child);
+ LOG.warn(
+ "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
+ return currentData;
+ }
+ String parent = null;
+ String child = null;
+ // remove the node from the queue
+ for (String node : allNodes) {
+ if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
+ parent = node;
+ jobDag.removeParentToChild(parent, namespacedJobName);
+ } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
+ child = node;
+ jobDag.removeParentToChild(namespacedJobName, child);
}
+ }
+ if (parent != null && child != null) {
+ jobDag.addParentToChild(parent, child);
+ }
+ jobDag.removeNode(namespacedJobName);
- jobDag.removeNode(namespacedJobName);
-
- // Save the updated DAG
- try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalStateException("Could not remove job " + jobName + " from DAG of queue " + queueName, e);
- }
+ // Save the updated DAG
+ try {
+ currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
}
return currentData;
}
};
String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
- boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
- if (!status) {
+ if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
throw new IllegalArgumentException(
"Could not remove job " + jobName + " from DAG of queue " + queueName);
}
@@ -449,8 +454,12 @@ public class TaskDriver {
// Create the job to ensure that it validates
JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
- // Add the job to the end of the queue in the DAG
final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+
+ // add job config first.
+ addJobConfig(namespacedJobName, jobConfig);
+
+ // Add the job to the end of the queue in the DAG
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
@@ -495,22 +504,38 @@ public class TaskDriver {
throw new IllegalArgumentException("Could not enqueue job");
}
// Schedule the job
- scheduleJob(namespacedJobName, jobConfig);
+ TaskUtil.invokeRebalance(_accessor, queueName);
}
- /** Posts new job to cluster */
- private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
- // Set up job resource based on partitions from target resource
- int numIndependentTasks = jobConfig.getTaskConfigMap().size();
- int numPartitions =
- (numIndependentTasks > 0) ? numIndependentTasks : _admin
- .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet()
- .size();
- _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+ /** Posts new workflow resource to cluster */
+ private void addWorkflowResource(String workflow) {
+ // Add workflow resource
+ _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
+
+ // Push out new ideal state for the workflow
+ CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
+ IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK)
+ .setNumReplica(1).setNumPartitions(1)
+ .setStateModel(TaskConstants.STATE_MODEL_NAME)
+ .setDisableExternalView(true);
+
+ IdealState is = IsBuilder.build();
+ is.getRecord().setListField(workflow, new ArrayList<String>());
+ is.getRecord().setMapField(workflow, new HashMap<String, String>());
+ is.setRebalancerClassName(WorkflowRebalancer.class.getName());
+ _admin.setResourceIdealState(_clusterName, workflow, is);
+
+ }
+
+ /**
+ * Add new job config to cluster
+ */
+ private void addJobConfig(String jobName, JobConfig jobConfig) {
+ LOG.info("Add job configuration " + jobName);
// Set the job configuration
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
- HelixProperty resourceConfig = new HelixProperty(jobResource);
+ HelixProperty resourceConfig = new HelixProperty(jobName);
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
if (taskConfigMap != null) {
@@ -518,30 +543,10 @@ public class TaskDriver {
resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
}
}
- _accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
- // Push out new ideal state based on number of target partitions
- CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
- builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
- builder.setNumReplica(1);
- builder.setNumPartitions(numPartitions);
- builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
- if (jobConfig.isDisableExternalView()) {
- builder.setDisableExternalView(jobConfig.isDisableExternalView());
+ if (!_accessor.setProperty(keyBuilder.resourceConfig(jobName), resourceConfig)) {
+ LOG.error("Failed to add job configuration for job " + jobName);
}
-
- IdealState is = builder.build();
- for (int i = 0; i < numPartitions; i++) {
- is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
- is.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
- }
- if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
- is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
- } else {
- is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName());
- }
- _admin.setResourceIdealState(_clusterName, jobResource, is);
}
/** Public method to resume a workflow/queue */
@@ -565,52 +570,47 @@ public class TaskDriver {
private void setWorkflowTargetState(String workflowName, TargetState state) {
setSingleWorkflowTargetState(workflowName, state);
- // TODO: this is the temporary fix for current task rebalance implementation.
- // We should fix this in new task framework implementation.
+ // TODO: just need to change the lastScheduledWorkflow.
List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
for (String resource : resources) {
if (resource.startsWith(workflowName)) {
setSingleWorkflowTargetState(resource, state);
}
}
-
- /* TODO: use this code for new task framework.
- // For recurring schedules, last scheduled incomplete workflow must also be handled
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
- String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
- if (lastScheduledWorkflow != null) {
- WorkflowContext lastScheduledWorkflowCtx =
- TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
- if (lastScheduledWorkflowCtx != null && !(
- lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
- || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
- setSingleWorkflowTargetState(lastScheduledWorkflow, state);
- }
- }
- */
}
/** Helper function to change target state for a given workflow */
private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
+ LOG.info("Set " + workflowName + " to target state " + state);
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
- if (currentData != null){
+ if (currentData != null) {
// Only update target state for non-completed workflows
String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+ } else {
+ LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
}
+ } else {
+ LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
}
return currentData;
}
};
List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- updaters.add(updater);
List<String> paths = Lists.newArrayList();
- paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- invokeRebalance();
+
+ PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
+ if (_accessor.getProperty(cfgKey) != null) {
+ paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
+ updaters.add(updater);
+ _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+ TaskUtil.invokeRebalance(_accessor, workflowName);
+ } else {
+ LOG.error("Configuration path " + cfgKey + " not found!");
+ }
}
public void list(String resource) {
@@ -666,21 +666,6 @@ public class TaskDriver {
}
}
- /**
- * Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is
- * fixed
- */
- public void invokeRebalance() {
- // find a task
- for (String resource : _admin.getResourcesInCluster(_clusterName)) {
- IdealState is = _admin.getResourceIdealState(_clusterName, resource);
- if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
- _accessor.updateProperty(_accessor.keyBuilder().idealStates(resource), is);
- break;
- }
- }
- }
-
/** Constructs options set for all basic control messages */
private static Options constructOptions() {
Options options = new Options();