You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/20 22:07:21 UTC
[3/3] git commit: [HELIX-353] Write an independent task rebalancer
[HELIX-353] Write an independent task rebalancer
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f1df1058
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f1df1058
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f1df1058
Branch: refs/heads/helix-0.6.x
Commit: f1df105878c368e7ef93735a6c4c96532fb806df
Parents: 4aa54eb
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Apr 21 14:38:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue May 20 11:10:05 2014 -0700
----------------------------------------------------------------------
.../stages/BestPossibleStateCalcStage.java | 31 +-
.../controller/stages/CurrentStateOutput.java | 61 ++--
.../helix/task/FixedTargetTaskRebalancer.java | 155 +++++++++
.../helix/task/GenericTaskRebalancer.java | 186 ++++++++++
.../java/org/apache/helix/task/JobConfig.java | 334 ++++++++++++++++++
.../java/org/apache/helix/task/JobContext.java | 227 +++++++++++++
.../main/java/org/apache/helix/task/JobDag.java | 151 +++++++++
.../java/org/apache/helix/task/TargetState.java | 30 +-
.../main/java/org/apache/helix/task/Task.java | 22 +-
.../apache/helix/task/TaskCallbackContext.java | 67 ++++
.../java/org/apache/helix/task/TaskConfig.java | 339 ++++++-------------
.../org/apache/helix/task/TaskConstants.java | 20 +-
.../java/org/apache/helix/task/TaskContext.java | 120 -------
.../java/org/apache/helix/task/TaskDag.java | 132 --------
.../java/org/apache/helix/task/TaskDriver.java | 125 ++++---
.../java/org/apache/helix/task/TaskFactory.java | 25 +-
.../apache/helix/task/TaskPartitionState.java | 20 +-
.../org/apache/helix/task/TaskRebalancer.java | 268 ++++++++-------
.../java/org/apache/helix/task/TaskResult.java | 20 +-
.../java/org/apache/helix/task/TaskRunner.java | 20 +-
.../java/org/apache/helix/task/TaskState.java | 20 +-
.../org/apache/helix/task/TaskStateModel.java | 55 ++-
.../helix/task/TaskStateModelFactory.java | 20 +-
.../java/org/apache/helix/task/TaskUtil.java | 98 ++++--
.../java/org/apache/helix/task/Workflow.java | 180 +++++++---
.../org/apache/helix/task/WorkflowConfig.java | 35 +-
.../org/apache/helix/task/WorkflowContext.java | 27 +-
.../org/apache/helix/task/beans/JobBean.java | 42 +++
.../org/apache/helix/task/beans/TaskBean.java | 37 +-
.../apache/helix/task/beans/WorkflowBean.java | 22 +-
.../TestCustomizedIdealStateRebalancer.java | 12 +-
.../task/TestIndependentTaskRebalancer.java | 170 ++++++++++
.../integration/task/TestTaskRebalancer.java | 106 +++---
.../task/TestTaskRebalancerStopResume.java | 43 ++-
.../apache/helix/integration/task/TestUtil.java | 8 +-
.../integration/task/WorkflowGenerator.java | 89 +++--
36 files changed, 2381 insertions(+), 936 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index df215c8..dad5978 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -136,19 +136,24 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
break;
}
if (rebalancer != null && mappingCalculator != null) {
- HelixManager manager = event.getAttribute("helixmanager");
- rebalancer.init(manager);
- idealState =
- rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
-
- // Use the internal MappingCalculator interface to compute the final assignment
- // The next release will support rebalancers that compute the mapping from start to finish
- ResourceAssignment partitionStateAssignment =
- mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
- currentStateOutput);
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
- output.setState(resourceName, partition, newStateMap);
+ try {
+ HelixManager manager = event.getAttribute("helixmanager");
+ rebalancer.init(manager);
+ idealState =
+ rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+
+ // Use the internal MappingCalculator interface to compute the final assignment
+ // The next release will support rebalancers that compute the mapping from start to finish
+ ResourceAssignment partitionStateAssignment =
+ mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
+ currentStateOutput);
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+ output.setState(resourceName, partition, newStateMap);
+ }
+ } catch (Exception e) {
+ logger
+ .error("Error computing assignment for resource " + resourceName + ". Skipping.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 9537272..ac9d748 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -22,17 +22,24 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Partition;
+import com.google.common.collect.Sets;
+
public class CurrentStateOutput {
private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
- // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the REQUESTED_STATE
+ // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the
+ // REQUESTED_STATE
// field in the CURRENTSTATES node.
private final Map<String, Map<Partition, Map<String, String>>> _requestedStateMap;
- // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field in the
- // CURRENTSTATES node. This is information returned by state transition methods on the participants. It may be used
+ // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field
+ // in the
+ // CURRENTSTATES node. This is information returned by state transition methods on the
+ // participants. It may be used
// by the rebalancer.
private final Map<String, Map<Partition, Map<String, String>>> _infoMap;
private final Map<String, String> _resourceStateModelMap;
@@ -85,7 +92,8 @@ public class CurrentStateOutput {
_currentStateMap.get(resourceName).get(partition).put(instanceName, state);
}
- public void setRequestedState(String resourceName, Partition partition, String instanceName, String state) {
+ public void setRequestedState(String resourceName, Partition partition, String instanceName,
+ String state) {
if (!_requestedStateMap.containsKey(resourceName)) {
_requestedStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
}
@@ -95,14 +103,11 @@ public class CurrentStateOutput {
_requestedStateMap.get(resourceName).get(partition).put(instanceName, state);
}
- public void setInfo(String resourceName, Partition partition, String instanceName, String state)
- {
- if (!_infoMap.containsKey(resourceName))
- {
+ public void setInfo(String resourceName, Partition partition, String instanceName, String state) {
+ if (!_infoMap.containsKey(resourceName)) {
_infoMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
}
- if (!_infoMap.get(resourceName).containsKey(partition))
- {
+ if (!_infoMap.get(resourceName).containsKey(partition)) {
_infoMap.get(resourceName).put(partition, new HashMap<String, String>());
}
_infoMap.get(resourceName).get(partition).put(instanceName, state);
@@ -137,28 +142,22 @@ public class CurrentStateOutput {
return null;
}
- public String getRequestedState(String resourceName, Partition partition, String instanceName)
- {
+ public String getRequestedState(String resourceName, Partition partition, String instanceName) {
Map<Partition, Map<String, String>> map = _requestedStateMap.get(resourceName);
- if (map != null)
- {
+ if (map != null) {
Map<String, String> instanceStateMap = map.get(partition);
- if (instanceStateMap != null)
- {
+ if (instanceStateMap != null) {
return instanceStateMap.get(instanceName);
}
}
return null;
}
- public String getInfo(String resourceName, Partition partition, String instanceName)
- {
+ public String getInfo(String resourceName, Partition partition, String instanceName) {
Map<Partition, Map<String, String>> map = _infoMap.get(resourceName);
- if (map != null)
- {
+ if (map != null) {
Map<String, String> instanceStateMap = map.get(partition);
- if (instanceStateMap != null)
- {
+ if (instanceStateMap != null) {
return instanceStateMap.get(instanceName);
}
}
@@ -215,6 +214,24 @@ public class CurrentStateOutput {
return Collections.emptyMap();
}
+ /**
+ * Get the partitions mapped in the current state
+ * @param resourceId resource to look up
+ * @return set of mapped partitions, or empty set if there are none
+ */
+ public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
+ Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId);
+ Map<Partition, Map<String, String>> pendingStateMap = _pendingStateMap.get(resourceId);
+ Set<Partition> partitionSet = Sets.newHashSet();
+ if (currentStateMap != null) {
+ partitionSet.addAll(currentStateMap.keySet());
+ }
+ if (pendingStateMap != null) {
+ partitionSet.addAll(pendingStateMap.keySet());
+ }
+ return partitionSet;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
new file mode 100644
index 0000000..dc6fbaa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -0,0 +1,155 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A rebalancer for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskRebalancer extends TaskRebalancer {
+
+ @Override
+ public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+ }
+
+ @Override
+ public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+ ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+ JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Set<Integer> partitionSet, ClusterDataCache cache) {
+ IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+ if (tgtIs == null) {
+ return Collections.emptyMap();
+ }
+ Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+ return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+ jobContext);
+ }
+
+ /**
+ * Gets the ideal state of the target resource of this job
+ * @param jobCfg job config containing target resource id
+ * @param cluster snapshot of the cluster containing the task and target resource
+ * @return target resource ideal state, or null
+ */
+ private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+ String tgtResourceId = jobCfg.getTargetResource();
+ return cache.getIdealState(tgtResourceId);
+ }
+
+ /**
+ * Returns the set of all partition ids for a job.
+ * <p/>
+ * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+ * use the list of all partition ids from the target resource.
+ */
+ private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+ JobContext taskCtx) {
+ if (tgtResourceIs == null) {
+ return null;
+ }
+ Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+ SortedSet<String> targetPartitions = Sets.newTreeSet();
+ if (jobCfg.getTargetPartitions() != null) {
+ targetPartitions.addAll(jobCfg.getTargetPartitions());
+ } else {
+ targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+ }
+
+ Set<Integer> taskPartitions = Sets.newTreeSet();
+ for (String pName : targetPartitions) {
+ taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+ }
+ return taskPartitions;
+ }
+
+ private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+ Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+ if (!currentTargets.containsKey(targetPartition)) {
+ int nextId = jobCtx.getPartitionSet().size();
+ jobCtx.setPartitionTarget(nextId, targetPartition);
+ return Lists.newArrayList(nextId);
+ } else {
+ return currentTargets.get(targetPartition);
+ }
+ }
+
+ /**
+ * Get partition assignments for the target resource, but only for the partitions of interest.
+ * @param currStateOutput The current state of the instances in the cluster.
+ * @param instanceList The set of instances.
+ * @param tgtIs The ideal state of the target resource.
+ * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+ * do not need to
+ * be in any specific state to be considered.
+ * @param includeSet The set of partitions to consider.
+ * @return A map of instance vs set of partition ids assigned to that instance.
+ */
+ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+ CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+ Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+ Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+ for (String instance : instanceList) {
+ result.put(instance, new TreeSet<Integer>());
+ }
+
+ Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+ for (String pName : tgtIs.getPartitionSet()) {
+ List<Integer> partitions = partitionsByTarget.get(pName);
+ if (partitions == null || partitions.size() < 1) {
+ continue;
+ }
+ int pId = partitions.get(0);
+ if (includeSet.contains(pId)) {
+ for (String instance : instanceList) {
+ String s =
+ currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
+ instance);
+ String state = (s == null ? null : s.toString());
+ if (tgtStates == null || tgtStates.contains(state)) {
+ result.get(instance).add(pId);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
new file mode 100644
index 0000000..9174eb1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -0,0 +1,186 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskRebalancer extends TaskRebalancer {
+ @Override
+ public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+ Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+ for (TaskConfig taskCfg : taskMap.values()) {
+ String taskId = taskCfg.getId();
+ int nextPartition = jobCtx.getPartitionSet().size();
+ if (!taskIdMap.containsKey(taskId)) {
+ jobCtx.setTaskIdForPartition(nextPartition, taskId);
+ }
+ }
+ return jobCtx.getPartitionSet();
+ }
+
+ @Override
+ public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+ ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+ final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Set<Integer> partitionSet, ClusterDataCache cache) {
+ // Gather input to the full auto rebalancing algorithm
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+ states.put("ONLINE", 1);
+
+ // Only map partitions whose assignment we care about
+ final Set<TaskPartitionState> honoredStates =
+ Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+ TaskPartitionState.STOPPED);
+ Set<Integer> filteredPartitionSet = Sets.newHashSet();
+ for (Integer p : partitionSet) {
+ TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+ if (state == null || honoredStates.contains(state)) {
+ filteredPartitionSet.add(p);
+ }
+ }
+
+ // Transform from partition id to fully qualified partition name
+ List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+ Collections.sort(partitionNums);
+ final String resourceId = prevAssignment.getResourceName();
+ List<String> partitions =
+ new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
+ @Override
+ public String apply(Integer partitionNum) {
+ return resourceId + "_" + partitionNum;
+ }
+ }));
+
+ // Compute the current assignment
+ Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+ for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+ if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) {
+ // not computing old partitions
+ continue;
+ }
+ Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
+ if (prevAssignment != null) {
+ allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+ }
+ allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+ allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+ currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
+ }
+
+ // Get the assignment keyed on partition
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+ new AutoRebalanceStrategy.DefaultPlacementScheme());
+ List<String> allNodes =
+ Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+ Collections.sort(allNodes);
+ ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+ Map<String, List<String>> preferenceLists = record.getListFields();
+
+ // Convert to an assignment keyed on participant
+ Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+ for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+ String partitionName = e.getKey();
+ partitionName = String.valueOf(pId(partitionName));
+ List<String> preferenceList = e.getValue();
+ for (String participantName : preferenceList) {
+ if (!taskAssignment.containsKey(participantName)) {
+ taskAssignment.put(participantName, new TreeSet<Integer>());
+ }
+ taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+ }
+ }
+ return taskAssignment;
+ }
+
+ /**
+ * Filter a list of instances based on targeted resource policies
+ * @param jobCfg the job configuration
+ * @param currStateOutput the current state of all instances in the cluster
+ * @param instanceList valid instances
+ * @param cache current snapshot of the cluster
+ * @return a set of instances that can be assigned to
+ */
+ private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
+ Iterable<String> instanceList, ClusterDataCache cache) {
+ // No target resource means any instance is available
+ Set<String> allInstances = Sets.newHashSet(instanceList);
+ String targetResource = jobCfg.getTargetResource();
+ if (targetResource == null) {
+ return allInstances;
+ }
+
+ // Bad ideal state means don't assign
+ IdealState idealState = cache.getIdealState(targetResource);
+ if (idealState == null) {
+ return Collections.emptySet();
+ }
+
+ // Get the partitions on the target resource to use
+ Set<String> partitions = idealState.getPartitionSet();
+ List<String> targetPartitions = jobCfg.getTargetPartitions();
+ if (targetPartitions != null && !targetPartitions.isEmpty()) {
+ partitions.retainAll(targetPartitions);
+ }
+
+ // Based on state matches, add eligible instances
+ Set<String> eligibleInstances = Sets.newHashSet();
+ Set<String> targetStates = jobCfg.getTargetPartitionStates();
+ for (String partition : partitions) {
+ Map<String, String> stateMap =
+ currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
+ for (Map.Entry<String, String> e : stateMap.entrySet()) {
+ String instanceName = e.getKey();
+ String state = e.getValue();
+ if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
+ eligibleInstances.add(instanceName);
+ }
+ }
+ }
+ allInstances.retainAll(eligibleInstances);
+ return allInstances;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
new file mode 100644
index 0000000..90e3cfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * Provides a typed interface to job configurations.
+ */
+public class JobConfig {
+ // // Property names ////
+
+ /** The name of the workflow to which the job belongs. */
+ public static final String WORKFLOW_ID = "WorkflowID";
+ /** The assignment strategy of this job */
+ public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
+ /** The name of the target resource. */
+ public static final String TARGET_RESOURCE = "TargetResource";
+ /**
+ * The set of the target partition states. The value must be a comma-separated list of partition
+ * states.
+ */
+ public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+ /**
+ * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+ */
+ public static final String TARGET_PARTITIONS = "TargetPartitions";
+ /** The command that is to be run by participants in the case of identical tasks. */
+ public static final String COMMAND = "Command";
+ /** The command configuration to be used by the tasks. */
+ public static final String JOB_CONFIG_MAP = "JobConfig";
+ /** The timeout for a task. */
+ public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
+ /** The maximum number of times the task rebalancer may attempt to execute a task. */
+ public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+ /** The number of concurrent tasks that are allowed to run on an instance. */
+ public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+ /** The individual task configurations, if any **/
+ public static final String TASK_CONFIGS = "TaskConfigs";
+
+ // // Default property values ////
+
+ public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+ public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
+ public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+ private final String _workflow;
+ private final String _targetResource;
+ private final List<String> _targetPartitions;
+ private final Set<String> _targetPartitionStates;
+ private final String _command;
+ private final Map<String, String> _jobConfigMap;
+ private final long _timeoutPerTask;
+ private final int _numConcurrentTasksPerInstance;
+ private final int _maxAttemptsPerTask;
+ private final Map<String, TaskConfig> _taskConfigMap;
+
+ private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
+ Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
+ long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+ Map<String, TaskConfig> taskConfigMap) {
+ _workflow = workflow;
+ _targetResource = targetResource;
+ _targetPartitions = targetPartitions;
+ _targetPartitionStates = targetPartitionStates;
+ _command = command;
+ _jobConfigMap = jobConfigMap;
+ _timeoutPerTask = timeoutPerTask;
+ _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+ _maxAttemptsPerTask = maxAttemptsPerTask;
+ if (taskConfigMap != null) {
+ _taskConfigMap = taskConfigMap;
+ } else {
+ _taskConfigMap = Collections.emptyMap();
+ }
+ }
+
+ public String getWorkflow() {
+ return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+ }
+
+ public String getTargetResource() {
+ return _targetResource;
+ }
+
+ public List<String> getTargetPartitions() {
+ return _targetPartitions;
+ }
+
+ public Set<String> getTargetPartitionStates() {
+ return _targetPartitionStates;
+ }
+
+ public String getCommand() {
+ return _command;
+ }
+
+ public Map<String, String> getJobConfigMap() {
+ return _jobConfigMap;
+ }
+
+ public long getTimeoutPerTask() {
+ return _timeoutPerTask;
+ }
+
+ public int getNumConcurrentTasksPerInstance() {
+ return _numConcurrentTasksPerInstance;
+ }
+
+ public int getMaxAttemptsPerTask() {
+ return _maxAttemptsPerTask;
+ }
+
+ public Map<String, TaskConfig> getTaskConfigMap() {
+ return _taskConfigMap;
+ }
+
+ public TaskConfig getTaskConfig(String id) {
+ return _taskConfigMap.get(id);
+ }
+
+ public Map<String, String> getResourceConfigMap() {
+ Map<String, String> cfgMap = new HashMap<String, String>();
+ cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+ if (_command != null) {
+ cfgMap.put(JobConfig.COMMAND, _command);
+ }
+ if (_jobConfigMap != null) {
+ String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+ if (serializedConfig != null) {
+ cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+ }
+ }
+ if (_targetResource != null) {
+ cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+ }
+ if (_targetPartitionStates != null) {
+ cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+ }
+ if (_targetPartitions != null) {
+ cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+ }
+ cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
+ cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+ return cfgMap;
+ }
+
+ /**
+ * A builder for {@link JobConfig}. Validates the configurations.
+ */
+ public static class Builder {
+ private String _workflow;
+ private String _targetResource;
+ private List<String> _targetPartitions;
+ private Set<String> _targetPartitionStates;
+ private String _command;
+ private Map<String, String> _commandConfig;
+ private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+ private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
+ private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+
+ public JobConfig build() {
+ validate();
+
+ return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
+ _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+ _maxAttemptsPerTask, _taskConfigMap);
+ }
+
+ /**
+ * Convenience method to build a {@link JobConfig} from a {@code Map<String, String>}.
+ * @param cfg A map of property names to their string representations.
+ * @return A {@link Builder}.
+ */
+ public static Builder fromMap(Map<String, String> cfg) {
+ Builder b = new Builder();
+ if (cfg.containsKey(WORKFLOW_ID)) {
+ b.setWorkflow(cfg.get(WORKFLOW_ID));
+ }
+ if (cfg.containsKey(TARGET_RESOURCE)) {
+ b.setTargetResource(cfg.get(TARGET_RESOURCE));
+ }
+ if (cfg.containsKey(TARGET_PARTITIONS)) {
+ b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+ }
+ if (cfg.containsKey(TARGET_PARTITION_STATES)) {
+ b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
+ TARGET_PARTITION_STATES).split(","))));
+ }
+ if (cfg.containsKey(COMMAND)) {
+ b.setCommand(cfg.get(COMMAND));
+ }
+ if (cfg.containsKey(JOB_CONFIG_MAP)) {
+ Map<String, String> commandConfigMap =
+ TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
+ b.setJobConfigMap(commandConfigMap);
+ }
+ if (cfg.containsKey(TIMEOUT_PER_TASK)) {
+ b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+ }
+ if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
+ b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
+ .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+ }
+ if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
+ b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+ }
+ return b;
+ }
+
+ public Builder setWorkflow(String v) {
+ _workflow = v;
+ return this;
+ }
+
+ public Builder setTargetResource(String v) {
+ _targetResource = v;
+ return this;
+ }
+
+ public Builder setTargetPartitions(List<String> v) {
+ _targetPartitions = ImmutableList.copyOf(v);
+ return this;
+ }
+
+ public Builder setTargetPartitionStates(Set<String> v) {
+ _targetPartitionStates = ImmutableSet.copyOf(v);
+ return this;
+ }
+
+ public Builder setCommand(String v) {
+ _command = v;
+ return this;
+ }
+
+ public Builder setJobConfigMap(Map<String, String> v) {
+ _commandConfig = v;
+ return this;
+ }
+
+ public Builder setTimeoutPerTask(long v) {
+ _timeoutPerTask = v;
+ return this;
+ }
+
+ public Builder setNumConcurrentTasksPerInstance(int v) {
+ _numConcurrentTasksPerInstance = v;
+ return this;
+ }
+
+ public Builder setMaxAttemptsPerTask(int v) {
+ _maxAttemptsPerTask = v;
+ return this;
+ }
+
+ public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
+ if (taskConfigs != null) {
+ for (TaskConfig taskConfig : taskConfigs) {
+ _taskConfigMap.put(taskConfig.getId(), taskConfig);
+ }
+ }
+ return this;
+ }
+
+ public Builder addTaskConfigMap(Map<String, TaskConfig> taskConfigMap) {
+ _taskConfigMap.putAll(taskConfigMap);
+ return this;
+ }
+
+ private void validate() {
+ if (_taskConfigMap.isEmpty() && _targetResource == null) {
+ throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+ }
+ if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
+ && _targetPartitionStates.isEmpty()) {
+ throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+ TARGET_PARTITION_STATES));
+ }
+ if (_taskConfigMap.isEmpty() && _command == null) {
+ throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+ }
+ if (_timeoutPerTask < 0) {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ TIMEOUT_PER_TASK, _timeoutPerTask));
+ }
+ if (_numConcurrentTasksPerInstance < 1) {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+ }
+ if (_maxAttemptsPerTask < 1) {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+ }
+ if (_workflow == null) {
+ throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+ }
+ }
+
+ private static List<String> csvToStringList(String csv) {
+ String[] vals = csv.split(",");
+ return Arrays.asList(vals);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
new file mode 100644
index 0000000..7742c67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -0,0 +1,227 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
+ * Helix property store.
+ */
+public class JobContext extends HelixProperty {
+ private enum ContextProperties {
+ START_TIME,
+ STATE,
+ NUM_ATTEMPTS,
+ FINISH_TIME,
+ TARGET,
+ TASK_ID
+ }
+
+ public JobContext(ZNRecord record) {
+ super(record);
+ }
+
+ public void setStartTime(long t) {
+ _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+ }
+
+ public long getStartTime() {
+ String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
+ if (tStr == null) {
+ return -1;
+ }
+
+ return Long.parseLong(tStr);
+ }
+
+ public void setPartitionState(int p, TaskPartitionState s) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.STATE.toString(), s.name());
+ }
+
+ public TaskPartitionState getPartitionState(int p) {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null) {
+ return null;
+ }
+
+ String str = map.get(ContextProperties.STATE.toString());
+ if (str != null) {
+ return TaskPartitionState.valueOf(str);
+ } else {
+ return null;
+ }
+ }
+
+ public void setPartitionNumAttempts(int p, int n) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+ }
+
+ public int incrementNumAttempts(int pId) {
+ int n = this.getPartitionNumAttempts(pId);
+ if (n < 0) {
+ n = 0;
+ }
+ n += 1;
+ this.setPartitionNumAttempts(pId, n);
+ return n;
+ }
+
+ public int getPartitionNumAttempts(int p) {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null) {
+ return -1;
+ }
+
+ String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
+ if (nStr == null) {
+ return -1;
+ }
+
+ return Integer.parseInt(nStr);
+ }
+
+ public void setPartitionFinishTime(int p, long t) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+ }
+
+ public long getPartitionFinishTime(int p) {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null) {
+ return -1;
+ }
+
+ String tStr = map.get(ContextProperties.FINISH_TIME.toString());
+ if (tStr == null) {
+ return -1;
+ }
+
+ return Long.parseLong(tStr);
+ }
+
+ public void setPartitionTarget(int p, String targetPName) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.TARGET.toString(), targetPName);
+ }
+
+ public String getTargetForPartition(int p) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ return null;
+ } else {
+ return map.get(ContextProperties.TARGET.toString());
+ }
+ }
+
+ public Map<String, List<Integer>> getPartitionsByTarget() {
+ Map<String, List<Integer>> result = Maps.newHashMap();
+ for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+ Integer pId = Integer.parseInt(mapField.getKey());
+ Map<String, String> map = mapField.getValue();
+ String target = map.get(ContextProperties.TARGET.toString());
+ if (target != null) {
+ List<Integer> partitions;
+ if (!result.containsKey(target)) {
+ partitions = Lists.newArrayList();
+ result.put(target, partitions);
+ } else {
+ partitions = result.get(target);
+ }
+ partitions.add(pId);
+ }
+ }
+ return result;
+ }
+
+ public Set<Integer> getPartitionSet() {
+ Set<Integer> partitions = Sets.newHashSet();
+ for (String pName : _record.getMapFields().keySet()) {
+ partitions.add(Integer.valueOf(pName));
+ }
+ return partitions;
+ }
+
+ public void setTaskIdForPartition(int p, String taskId) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.TASK_ID.toString(), taskId);
+ }
+
+ public String getTaskIdForPartition(int p) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ return null;
+ } else {
+ return map.get(ContextProperties.TASK_ID.toString());
+ }
+ }
+
+ public Map<String, Integer> getTaskIdPartitionMap() {
+ Map<String, Integer> partitionMap = new HashMap<String, Integer>();
+ for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+ Integer pId = Integer.parseInt(mapField.getKey());
+ Map<String, String> map = mapField.getValue();
+ if (map.containsKey(ContextProperties.TASK_ID.toString())) {
+ partitionMap.put(map.get(ContextProperties.TASK_ID.toString()), pId);
+ }
+ }
+ return partitionMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
new file mode 100644
index 0000000..18a721e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -0,0 +1,151 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a job dependency graph
+ */
+public class JobDag {
+ @JsonProperty("parentsToChildren")
+ private Map<String, Set<String>> _parentsToChildren;
+
+ @JsonProperty("childrenToParents")
+ private Map<String, Set<String>> _childrenToParents;
+
+ @JsonProperty("allNodes")
+ private Set<String> _allNodes;
+
+ public static final JobDag EMPTY_DAG = new JobDag();
+
+ public JobDag() {
+ _parentsToChildren = new TreeMap<String, Set<String>>();
+ _childrenToParents = new TreeMap<String, Set<String>>();
+ _allNodes = new TreeSet<String>();
+ }
+
+ public void addParentToChild(String parent, String child) {
+ if (!_parentsToChildren.containsKey(parent)) {
+ _parentsToChildren.put(parent, new TreeSet<String>());
+ }
+ _parentsToChildren.get(parent).add(child);
+
+ if (!_childrenToParents.containsKey(child)) {
+ _childrenToParents.put(child, new TreeSet<String>());
+ }
+ _childrenToParents.get(child).add(parent);
+
+ _allNodes.add(parent);
+ _allNodes.add(child);
+ }
+
+ public void addNode(String node) {
+ _allNodes.add(node);
+ }
+
+ public Map<String, Set<String>> getParentsToChildren() {
+ return _parentsToChildren;
+ }
+
+ public Map<String, Set<String>> getChildrenToParents() {
+ return _childrenToParents;
+ }
+
+ public Set<String> getAllNodes() {
+ return _allNodes;
+ }
+
+ public Set<String> getDirectChildren(String node) {
+ if (!_parentsToChildren.containsKey(node)) {
+ return new TreeSet<String>();
+ }
+ return _parentsToChildren.get(node);
+ }
+
+ public Set<String> getDirectParents(String node) {
+ if (!_childrenToParents.containsKey(node)) {
+ return new TreeSet<String>();
+ }
+ return _childrenToParents.get(node);
+ }
+
+ public String toJson() throws Exception {
+ return new ObjectMapper().writeValueAsString(this);
+ }
+
+ public static JobDag fromJson(String json) {
+ try {
+ return new ObjectMapper().readValue(json, JobDag.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to parse json " + json + " into job dag");
+ }
+ }
+
+ /**
+ * Checks that dag contains no cycles and all nodes are reachable.
+ */
+ public void validate() {
+ Set<String> prevIteration = new TreeSet<String>();
+
+ // get all unparented nodes
+ for (String node : _allNodes) {
+ if (getDirectParents(node).isEmpty()) {
+ prevIteration.add(node);
+ }
+ }
+
+ // visit children nodes up to max iteration count, by which point we should have exited
+ // naturally
+ Set<String> allNodesReached = new TreeSet<String>();
+ int iterationCount = 0;
+ int maxIterations = _allNodes.size() + 1;
+
+ while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
+ // construct set of all children reachable from prev iteration
+ Set<String> thisIteration = new TreeSet<String>();
+ for (String node : prevIteration) {
+ thisIteration.addAll(getDirectChildren(node));
+ }
+
+ allNodesReached.addAll(prevIteration);
+ prevIteration = thisIteration;
+ iterationCount++;
+ }
+
+ allNodesReached.addAll(prevIteration);
+
+ if (iterationCount >= maxIterations) {
+ throw new IllegalArgumentException("DAG invalid: cycles detected");
+ }
+
+ if (!allNodesReached.containsAll(_allNodes)) {
+ throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
+ + allNodesReached);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
index 36552fc..4285e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TargetState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -1,21 +1,39 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
/**
- * Enumeration of target states for a task.
+ * Enumeration of target states for a job.
*/
public enum TargetState {
/**
- * Indicates that the rebalancer must start/resume the task.
+ * Indicates that the rebalancer must start/resume the job.
*/
START,
/**
- * Indicates that the rebalancer should stop any running task partitions and cease doing any
- * further task
- * assignments.
+ * Indicates that the rebalancer should stop any running tasks and cease doing any
+ * further task assignments.
*/
STOP,
/**
- * Indicates that the rebalancer must delete this task.
+ * Indicates that the rebalancer must delete this job.
*/
DELETE
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
index 027d7fe..207fd96 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Task.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -1,12 +1,32 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
/**
* The interface that is to be implemented by a specific task implementation.
*/
public interface Task {
/**
* Execute the task.
- * @return A {@link TaskResult} object indicating the status of the task and any additional context
+ * @return A {@link TaskResult} object indicating the status of the task and any additional
+ * context
* information that
* can be interpreted by the specific {@link Task} implementation.
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
new file mode 100644
index 0000000..124ec12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
@@ -0,0 +1,67 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+
+/**
+ * A wrapper for all information about a task and the job of which it is a part.
+ */
+public class TaskCallbackContext {
+ private HelixManager _manager;
+ private TaskConfig _taskConfig;
+ private JobConfig _jobConfig;
+
+ void setManager(HelixManager manager) {
+ _manager = manager;
+ }
+
+ void setTaskConfig(TaskConfig taskConfig) {
+ _taskConfig = taskConfig;
+ }
+
+ void setJobConfig(JobConfig jobConfig) {
+ _jobConfig = jobConfig;
+ }
+
+ /**
+ * Get an active Helix connection
+ * @return HelixManager instance
+ */
+ public HelixManager getManager() {
+ return _manager;
+ }
+
+ /**
+ * Get task-specific configuration properties
+ * @return TaskConfig instance
+ */
+ public TaskConfig getTaskConfig() {
+ return _taskConfig;
+ }
+
+ /**
+ * Get job-specific configuration properties
+ * @return JobConfig instance
+ */
+ public JobConfig getJobConfig() {
+ return _jobConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 4deb588..547ba48 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -1,275 +1,126 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.collect.Maps;
/**
- * Provides a typed interface to task configurations.
+ * Configuration for an individual task to be run as part of a job.
*/
public class TaskConfig {
- // // Property names ////
-
- /** The name of the workflow to which the task belongs. */
- public static final String WORKFLOW_ID = "WorkflowID";
- /** The name of the target resource. */
- public static final String TARGET_RESOURCE = "TargetResource";
- /**
- * The set of the target partition states. The value must be a comma-separated list of partition
- * states.
- */
- public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
- /**
- * The set of the target partition ids. The value must be a comma-separated list of partition ids.
- */
- public static final String TARGET_PARTITIONS = "TargetPartitions";
- /** The command that is to be run by participants. */
- public static final String COMMAND = "Command";
- /** The command configuration to be used by the task partitions. */
- public static final String COMMAND_CONFIG = "CommandConfig";
- /** The timeout for a task partitions. */
- public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
- /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
- public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
- /** The number of concurrent tasks that are allowed to run on an instance. */
- public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-
- // // Default property values ////
-
- public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
- public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
- public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
-
- private final String _workflow;
- private final String _targetResource;
- private final List<Integer> _targetPartitions;
- private final Set<String> _targetPartitionStates;
- private final String _command;
- private final String _commandConfig;
- private final long _timeoutPerPartition;
- private final int _numConcurrentTasksPerInstance;
- private final int _maxAttemptsPerPartition;
-
- private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
- Set<String> targetPartitionStates, String command, String commandConfig,
- long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
- _workflow = workflow;
- _targetResource = targetResource;
- _targetPartitions = targetPartitions;
- _targetPartitionStates = targetPartitionStates;
- _command = command;
- _commandConfig = commandConfig;
- _timeoutPerPartition = timeoutPerPartition;
- _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
- _maxAttemptsPerPartition = maxAttemptsPerPartition;
+ private enum TaskConfigFields {
+ TASK_ID,
+ TASK_COMMAND
}
- public String getWorkflow() {
- return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
- }
-
- public String getTargetResource() {
- return _targetResource;
- }
+ private static final Logger LOG = Logger.getLogger(TaskConfig.class);
- public List<Integer> getTargetPartitions() {
- return _targetPartitions;
- }
+ private final Map<String, String> _configMap;
- public Set<String> getTargetPartitionStates() {
- return _targetPartitionStates;
- }
-
- public String getCommand() {
- return _command;
+ /**
+ * Instantiate the task config
+ * @param command the command to invoke for the task
+ * @param configMap configuration to be passed as part of the invocation
+ * @param id existing task ID
+ */
+ public TaskConfig(String command, Map<String, String> configMap, String id) {
+ if (configMap == null) {
+ configMap = Maps.newHashMap();
+ }
+ if (id == null) {
+ id = UUID.randomUUID().toString();
+ }
+ configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+ configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+ _configMap = configMap;
}
- public String getCommandConfig() {
- return _commandConfig;
+ /**
+ * Instantiate the task config
+ * @param command the command to invoke for the task
+ * @param configMap configuration to be passed as part of the invocation
+ */
+ public TaskConfig(String command, Map<String, String> configMap) {
+ this(command, configMap, null);
}
- public long getTimeoutPerPartition() {
- return _timeoutPerPartition;
+ /**
+ * Unique identifier for this task
+ * @return UUID as a string
+ */
+ public String getId() {
+ return _configMap.get(TaskConfigFields.TASK_ID.toString());
}
- public int getNumConcurrentTasksPerInstance() {
- return _numConcurrentTasksPerInstance;
+ /**
+ * Get the command to invoke for this task
+ * @return string command
+ */
+ public String getCommand() {
+ return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
}
- public int getMaxAttemptsPerPartition() {
- return _maxAttemptsPerPartition;
+ /**
+ * Get the configuration map for this task's command
+ * @return map of configuration key to value
+ */
+ public Map<String, String> getConfigMap() {
+ return _configMap;
}
- public Map<String, String> getResourceConfigMap() {
- Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
- cfgMap.put(TaskConfig.COMMAND, _command);
- cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
- cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
- cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
- if (_targetPartitions != null) {
- cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+ @Override
+ public String toString() {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (IOException e) {
+ LOG.error("Could not serialize TaskConfig", e);
}
- cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
- cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-
- return cfgMap;
+ return super.toString();
}
/**
- * A builder for {@link TaskConfig}. Validates the configurations.
+ * Instantiate a typed configuration from a bean
+ * @param bean plain bean describing the task
+ * @return instantiated TaskConfig
*/
- public static class Builder {
- private String _workflow;
- private String _targetResource;
- private List<Integer> _targetPartitions;
- private Set<String> _targetPartitionStates;
- private String _command;
- private String _commandConfig;
- private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
- private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
-
- public TaskConfig build() {
- validate();
-
- return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
- _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
- _maxAttemptsPerPartition);
- }
-
- /**
- * Convenience method to build a {@link TaskConfig} from a {@code Map<String, String>}.
- * @param cfg A map of property names to their string representations.
- * @return A {@link Builder}.
- */
- public static Builder fromMap(Map<String, String> cfg) {
- Builder b = new Builder();
- if (cfg.containsKey(WORKFLOW_ID)) {
- b.setWorkflow(cfg.get(WORKFLOW_ID));
- }
- if (cfg.containsKey(TARGET_RESOURCE)) {
- b.setTargetResource(cfg.get(TARGET_RESOURCE));
- }
- if (cfg.containsKey(TARGET_PARTITIONS)) {
- b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
- }
- if (cfg.containsKey(TARGET_PARTITION_STATES)) {
- b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
- TARGET_PARTITION_STATES).split(","))));
- }
- if (cfg.containsKey(COMMAND)) {
- b.setCommand(cfg.get(COMMAND));
- }
- if (cfg.containsKey(COMMAND_CONFIG)) {
- b.setCommandConfig(cfg.get(COMMAND_CONFIG));
- }
- if (cfg.containsKey(TIMEOUT_PER_PARTITION)) {
- b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
- }
- if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
- b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
- .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
- }
- if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
- b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
- }
-
- return b;
- }
-
- public Builder setWorkflow(String v) {
- _workflow = v;
- return this;
- }
-
- public Builder setTargetResource(String v) {
- _targetResource = v;
- return this;
- }
-
- public Builder setTargetPartitions(List<Integer> v) {
- _targetPartitions = ImmutableList.copyOf(v);
- return this;
- }
-
- public Builder setTargetPartitionStates(Set<String> v) {
- _targetPartitionStates = ImmutableSet.copyOf(v);
- return this;
- }
-
- public Builder setCommand(String v) {
- _command = v;
- return this;
- }
-
- public Builder setCommandConfig(String v) {
- _commandConfig = v;
- return this;
- }
-
- public Builder setTimeoutPerPartition(long v) {
- _timeoutPerPartition = v;
- return this;
- }
-
- public Builder setNumConcurrentTasksPerInstance(int v) {
- _numConcurrentTasksPerInstance = v;
- return this;
- }
-
- public Builder setMaxAttemptsPerPartition(int v) {
- _maxAttemptsPerPartition = v;
- return this;
- }
-
- private void validate() {
- if (_targetResource == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
- }
- if (_targetPartitionStates != null && _targetPartitionStates.isEmpty()) {
- throw new IllegalArgumentException(String.format("%s cannot be an empty set",
- TARGET_PARTITION_STATES));
- }
- if (_command == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
- }
- if (_timeoutPerPartition < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- TIMEOUT_PER_PARTITION, _timeoutPerPartition));
- }
- if (_numConcurrentTasksPerInstance < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
- }
- if (_maxAttemptsPerPartition < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- MAX_ATTEMPTS_PER_PARTITION, _maxAttemptsPerPartition));
- }
- if (_workflow == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
- }
- }
-
- private static List<Integer> csvToIntList(String csv) {
- String[] vals = csv.split(",");
- List<Integer> l = new ArrayList<Integer>();
- for (String v : vals) {
- l.add(Integer.parseInt(v));
- }
+ public static TaskConfig from(TaskBean bean) {
+ return new TaskConfig(bean.command, bean.taskConfigMap);
+ }
- return l;
- }
+ /**
+ * Instantiate a typed configuration from a raw string map
+ * @param rawConfigMap mixed map of configuration and task metadata
+ * @return instantiated TaskConfig
+ */
+ public static TaskConfig from(Map<String, String> rawConfigMap) {
+ String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
+ String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
+ return new TaskConfig(command, rawConfigMap, taskId);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 1e822e0..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
/**
* Constants used in the task framework.
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
deleted file mode 100644
index 6a410e7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * $id$
- */
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
-/**
- * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
- * Helix property store.
- */
-public class TaskContext extends HelixProperty {
- public static final String START_TIME = "START_TIME";
- public static final String PARTITION_STATE = "STATE";
- public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
- public static final String FINISH_TIME = "FINISH_TIME";
-
- public TaskContext(ZNRecord record) {
- super(record);
- }
-
- public void setStartTime(long t) {
- _record.setSimpleField(START_TIME, String.valueOf(t));
- }
-
- public long getStartTime() {
- String tStr = _record.getSimpleField(START_TIME);
- if (tStr == null) {
- return -1;
- }
-
- return Long.parseLong(tStr);
- }
-
- public void setPartitionState(int p, TaskPartitionState s) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
- map.put(PARTITION_STATE, s.name());
- }
-
- public TaskPartitionState getPartitionState(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
- if (map == null) {
- return null;
- }
-
- String str = map.get(PARTITION_STATE);
- if (str != null) {
- return TaskPartitionState.valueOf(str);
- } else {
- return null;
- }
- }
-
- public void setPartitionNumAttempts(int p, int n) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
- map.put(NUM_ATTEMPTS, String.valueOf(n));
- }
-
- public int incrementNumAttempts(int pId) {
- int n = this.getPartitionNumAttempts(pId);
- if (n < 0) {
- n = 0;
- }
- n += 1;
- this.setPartitionNumAttempts(pId, n);
- return n;
- }
-
- public int getPartitionNumAttempts(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
- if (map == null) {
- return -1;
- }
-
- String nStr = map.get(NUM_ATTEMPTS);
- if (nStr == null) {
- return -1;
- }
-
- return Integer.parseInt(nStr);
- }
-
- public void setPartitionFinishTime(int p, long t) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
- map.put(FINISH_TIME, String.valueOf(t));
- }
-
- public long getPartitionFinishTime(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
- if (map == null) {
- return -1;
- }
-
- String tStr = map.get(FINISH_TIME);
- if (tStr == null) {
- return -1;
- }
-
- return Long.parseLong(tStr);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
deleted file mode 100644
index a237507..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-
-/**
- * Provides a convenient way to construct, traverse,
- * and validate a task dependency graph
- */
-public class TaskDag {
- @JsonProperty("parentsToChildren")
- private Map<String, Set<String>> _parentsToChildren;
-
- @JsonProperty("childrenToParents")
- private Map<String, Set<String>> _childrenToParents;
-
- @JsonProperty("allNodes")
- private Set<String> _allNodes;
-
- public static final TaskDag EMPTY_DAG = new TaskDag();
-
- public TaskDag() {
- _parentsToChildren = new TreeMap<String, Set<String>>();
- _childrenToParents = new TreeMap<String, Set<String>>();
- _allNodes = new TreeSet<String>();
- }
-
- public void addParentToChild(String parent, String child) {
- if (!_parentsToChildren.containsKey(parent)) {
- _parentsToChildren.put(parent, new TreeSet<String>());
- }
- _parentsToChildren.get(parent).add(child);
-
- if (!_childrenToParents.containsKey(child)) {
- _childrenToParents.put(child, new TreeSet<String>());
- }
- _childrenToParents.get(child).add(parent);
-
- _allNodes.add(parent);
- _allNodes.add(child);
- }
-
- public void addNode(String node) {
- _allNodes.add(node);
- }
-
- public Map<String, Set<String>> getParentsToChildren() {
- return _parentsToChildren;
- }
-
- public Map<String, Set<String>> getChildrenToParents() {
- return _childrenToParents;
- }
-
- public Set<String> getAllNodes() {
- return _allNodes;
- }
-
- public Set<String> getDirectChildren(String node) {
- if (!_parentsToChildren.containsKey(node)) {
- return new TreeSet<String>();
- }
- return _parentsToChildren.get(node);
- }
-
- public Set<String> getDirectParents(String node) {
- if (!_childrenToParents.containsKey(node)) {
- return new TreeSet<String>();
- }
- return _childrenToParents.get(node);
- }
-
- public String toJson() throws Exception {
- return new ObjectMapper().writeValueAsString(this);
- }
-
- public static TaskDag fromJson(String json) {
- try {
- return new ObjectMapper().readValue(json, TaskDag.class);
- } catch (Exception e) {
- throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
- }
- }
-
- /**
- * Checks that dag contains no cycles and all nodes are reachable.
- */
- public void validate() {
- Set<String> prevIteration = new TreeSet<String>();
-
- // get all unparented nodes
- for (String node : _allNodes) {
- if (getDirectParents(node).isEmpty()) {
- prevIteration.add(node);
- }
- }
-
- // visit children nodes up to max iteration count, by which point we should have exited
- // naturally
- Set<String> allNodesReached = new TreeSet<String>();
- int iterationCount = 0;
- int maxIterations = _allNodes.size() + 1;
-
- while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
- // construct set of all children reachable from prev iteration
- Set<String> thisIteration = new TreeSet<String>();
- for (String node : prevIteration) {
- thisIteration.addAll(getDirectChildren(node));
- }
-
- allNodesReached.addAll(prevIteration);
- prevIteration = thisIteration;
- iterationCount++;
- }
-
- allNodesReached.addAll(prevIteration);
-
- if (iterationCount >= maxIterations) {
- throw new IllegalArgumentException("DAG invalid: cycles detected");
- }
-
- if (!allNodesReached.containsAll(_allNodes)) {
- throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
- + allNodesReached);
- }
- }
-}