You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/12/07 00:25:42 UTC
[2/3] [HELIX-336] Add support for task framework, rb=16071
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
new file mode 100644
index 0000000..d1bce56
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -0,0 +1,682 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+/**
+ * Custom rebalancer implementation for the {@code Task} state model.
+ */
+public class TaskRebalancer implements HelixRebalancer {
+ private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+ private HelixManager _manager;
+
+ @Override
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
+ _manager = helixManager;
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ ResourceAssignment helixPrevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ final ResourceId resourceId = rebalancerConfig.getResourceId();
+ final String resourceName = resourceId.stringify();
+
+ // Fetch task configuration
+ TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+ String workflowResource = taskCfg.getWorkflow();
+
+ // Fetch workflow configuration and context
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+ // Initialize workflow context if needed
+ if (workflowCtx == null) {
+ workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+ workflowCtx.setStartTime(System.currentTimeMillis());
+ }
+
+ // Check parent dependencies
+ for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
+ if (workflowCtx.getTaskState(parent) == null
+ || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
+ return emptyAssignment(resourceId);
+ }
+ }
+
+ // Clean up if workflow marked for deletion
+ TargetState targetState = workflowCfg.getTargetState();
+ if (targetState == TargetState.DELETE) {
+ cleanup(_manager, resourceName, workflowCfg, workflowResource);
+ return emptyAssignment(resourceId);
+ }
+
+ // Check if this workflow has been finished past its expiry.
+ if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+ && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+ markForDeletion(_manager, workflowResource);
+ cleanup(_manager, resourceName, workflowCfg, workflowResource);
+ return emptyAssignment(resourceId);
+ }
+
+ // Fetch any existing context information from the property store.
+ TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
+ if (taskCtx == null) {
+ taskCtx = new TaskContext(new ZNRecord("TaskContext"));
+ taskCtx.setStartTime(System.currentTimeMillis());
+ }
+
+ // The task is already in a final state (completed/failed).
+ if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
+ || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
+ return emptyAssignment(resourceId);
+ }
+
+ ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
+ if (prevAssignment == null) {
+ prevAssignment = new ResourceAssignment(resourceId);
+ }
+
+ // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+ // is stored in zk.
+ // Fetch the previous resource assignment from the property store. This is required because of
+ // HELIX-230.
+ Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+
+ ResourceId tgtResourceId = ResourceId.from(taskCfg.getTargetResource());
+ RebalancerConfig tgtResourceRebalancerCfg =
+ cluster.getResource(tgtResourceId).getRebalancerConfig();
+
+ Set<ParticipantId> liveInstances = cluster.getLiveParticipantMap().keySet();
+
+ IdealState tgtResourceIs =
+ ResourceAccessor.rebalancerConfigToIdealState(tgtResourceRebalancerCfg, cluster
+ .getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
+ .getBatchMessageMode());
+ ResourceAssignment newAssignment =
+ computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, tgtResourceIs,
+ liveInstances, currentState, workflowCtx, taskCtx, partitionsToDrop);
+
+ PartitionedRebalancerConfig userConfig =
+ BasicRebalancerConfig.convert(rebalancerConfig, PartitionedRebalancerConfig.class);
+ if (!partitionsToDrop.isEmpty()) {
+ for (Integer pId : partitionsToDrop) {
+ userConfig.getPartitionMap().remove(PartitionId.from(pName(resourceName, pId)));
+ }
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+
+ IdealState taskIs =
+ ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig,
+ cluster.getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
+ .getBatchMessageMode());
+ accessor.setProperty(propertyKey, taskIs);
+ }
+
+ // Update rebalancer context, previous ideal state.
+ TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+ TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+ TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+ return newAssignment;
+ }
+
+ private static ResourceAssignment computeResourceMapping(String taskResource,
+ WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
+ IdealState tgtResourceIs, Iterable<ParticipantId> liveInstances,
+ ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, TaskContext taskCtx,
+ Set<Integer> partitionsToDropFromIs) {
+
+ TargetState taskTgtState = workflowConfig.getTargetState();
+
+ // Update running status in workflow context
+ if (taskTgtState == TargetState.STOP) {
+ workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
+ // Workflow has been stopped if all tasks are stopped
+ if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+ workflowCtx.setWorkflowState(TaskState.STOPPED);
+ }
+ } else {
+ workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+ // Workflow is in progress if any task is in progress
+ workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+ }
+
+ // Used to keep track of task partitions that have already been assigned to instances.
+ Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+ // Keeps a mapping of (partition) -> (instance, state)
+ Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+ // Process all the current assignments of task partitions.
+ Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
+ Map<String, SortedSet<Integer>> taskAssignments =
+ getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+ for (String instance : taskAssignments.keySet()) {
+ Set<Integer> pSet = taskAssignments.get(instance);
+ // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+ // TASK_ERROR, ERROR.
+ Set<Integer> donePartitions = new TreeSet<Integer>();
+ for (int pId : pSet) {
+ final String pName = pName(taskResource, pId);
+
+ // Check for pending state transitions on this (partition, instance).
+ State s =
+ currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
+ ParticipantId.from(instance));
+ String pendingState = (s == null ? null : s.toString());
+ if (pendingState != null) {
+ // There is a pending state transition for this (partition, instance). Just copy forward
+ // the state assignment from the previous ideal state.
+ Map<ParticipantId, State> stateMap =
+ prevAssignment.getReplicaMap(PartitionId.from(pName));
+ if (stateMap != null) {
+ State prevState = stateMap.get(ParticipantId.from(instance));
+ paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
+ assignedPartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String
+ .format(
+ "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+ pName, instance, prevState));
+ }
+ }
+
+ continue;
+ }
+
+ TaskPartitionState currState =
+ TaskPartitionState.valueOf(currStateOutput.getCurrentState(
+ ResourceId.from(taskResource), PartitionId.from(pName),
+ ParticipantId.from(instance)).toString());
+
+ // Process any requested state transitions.
+ State reqS =
+ currStateOutput.getRequestedState(ResourceId.from(taskResource),
+ PartitionId.from(pName), ParticipantId.from(instance));
+ String requestedStateStr = (reqS == null ? null : reqS.toString());
+ if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+ TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+ if (requestedState.equals(currState)) {
+ LOG.warn(String.format(
+ "Requested state %s is the same as the current state for instance %s.",
+ requestedState, instance));
+ }
+
+ paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+ assignedPartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Instance %s requested a state transition to %s for partition %s.", instance,
+ requestedState, pName));
+ }
+ continue;
+ }
+
+ switch (currState) {
+ case RUNNING:
+ case STOPPED: {
+ TaskPartitionState nextState;
+ if (taskTgtState == TargetState.START) {
+ nextState = TaskPartitionState.RUNNING;
+ } else {
+ nextState = TaskPartitionState.STOPPED;
+ }
+
+ paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+ assignedPartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+ nextState, instance));
+ }
+ }
+ break;
+ case COMPLETED: {
+ // The task has completed on this partition. Mark as such in the context object.
+ donePartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String
+ .format(
+ "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ }
+ partitionsToDropFromIs.add(pId);
+ markPartitionCompleted(taskCtx, pId);
+ }
+ break;
+ case TIMED_OUT:
+ case TASK_ERROR:
+ case ERROR: {
+ donePartitions.add(pId); // The task may be rescheduled on a different instance.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Task partition %s has error state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ }
+ markPartitionError(taskCtx, pId, currState);
+ // The error policy is to fail the task as soon a single partition fails for a specified
+ // maximum number of attempts.
+ if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
+ workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+ workflowCtx.setWorkflowState(TaskState.FAILED);
+ addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
+ return emptyAssignment(ResourceId.from(taskResource));
+ }
+ }
+ break;
+ case INIT:
+ case DROPPED: {
+ // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+ donePartitions.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Task partition %s has state %s. It will be dropped from the current ideal state.",
+ pName, currState));
+ }
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown enum symbol: " + currState);
+ }
+ }
+
+ // Remove the set of task partitions that are completed or in one of the error states.
+ pSet.removeAll(donePartitions);
+ }
+
+ if (isTaskComplete(taskCtx, allPartitions)) {
+ workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+ if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+ workflowCtx.setWorkflowState(TaskState.COMPLETED);
+ workflowCtx.setFinishTime(System.currentTimeMillis());
+ }
+ }
+
+ // Make additional task assignments if needed.
+ if (taskTgtState == TargetState.START) {
+ // Contains the set of task partitions that must be excluded from consideration when making
+ // any new assignments.
+ // This includes all completed, failed, already assigned partitions.
+ Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+ addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+ // Get instance->[partition, ...] mappings for the target resource.
+ Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+ getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
+ taskCfg.getTargetPartitionStates(), allPartitions);
+ for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+ String instance = entry.getKey();
+ // Contains the set of task partitions currently assigned to the instance.
+ Set<Integer> pSet = entry.getValue();
+ int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+ if (numToAssign > 0) {
+ List<Integer> nextPartitions =
+ getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+ for (Integer pId : nextPartitions) {
+ String pName = pName(taskResource, pId);
+ paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+ excludeSet.add(pId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
+ pName, TaskPartitionState.RUNNING, instance));
+ }
+ }
+ }
+ }
+ }
+
+ // Construct a ResourceAssignment object from the map of partition assignments.
+ ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
+ for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+ PartitionAssignment pa = e.getValue();
+ ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
+ ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
+ }
+
+ return ra;
+ }
+
+ /**
+ * Checks if the task has completed.
+ * @param ctx The rebalancer context.
+ * @param allPartitions The set of partitions to check.
+ * @return true if all task partitions have been marked with status
+ * {@link TaskPartitionState#COMPLETED} in the rebalancer
+ * context, false otherwise.
+ */
+ private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+ for (Integer pId : allPartitions) {
+ TaskPartitionState state = ctx.getPartitionState(pId);
+ if (state != TaskPartitionState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Checks if the workflow has completed.
+ * @param ctx Workflow context containing task states
+ * @param cfg Workflow config containing set of tasks
+ * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+ */
+ private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+ for (String task : cfg.getTaskDag().getAllNodes()) {
+ if (ctx.getTaskState(task) != TaskState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Checks if the workflow has been stopped.
+ * @param ctx Workflow context containing task states
+ * @param cfg Workflow config containing set of tasks
+ * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+ */
+ private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+ for (String task : cfg.getTaskDag().getAllNodes()) {
+ if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void markForDeletion(HelixManager mgr, String resourceName) {
+ mgr.getConfigAccessor().set(
+ TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+ WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+ }
+
+ /**
+ * Cleans up all Helix state associated with this task, wiping workflow-level information if this
+ * is the last remaining task in its workflow.
+ */
+ private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+ String workflowResource) {
+ HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+ // Delete resource configs.
+ PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+ if (!accessor.removeProperty(cfgKey)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ resourceName, cfgKey));
+ }
+ // Delete property store information for this resource.
+ String propStoreKey = getRebalancerPropStoreKey(resourceName);
+ if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ resourceName, propStoreKey));
+ }
+ // Finally, delete the ideal state itself.
+ PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+ if (!accessor.removeProperty(isKey)) {
+ throw new RuntimeException(String.format(
+ "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+ resourceName, isKey));
+ }
+ LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+
+ boolean lastInWorkflow = true;
+ for (String task : cfg.getTaskDag().getAllNodes()) {
+ // check if property store information or resource configs exist for this task
+ if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+ AccessOption.PERSISTENT)
+ || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+ || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
+ lastInWorkflow = false;
+ }
+ }
+
+ // clean up task-level info if this was the last in workflow
+ if (lastInWorkflow) {
+ // delete workflow config
+ PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+ if (!accessor.removeProperty(workflowCfgKey)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ workflowResource, workflowCfgKey));
+ }
+ // Delete property store information for this workflow
+ String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+ if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ workflowResource, workflowPropStoreKey));
+ }
+ }
+
+ }
+
+ private static String getRebalancerPropStoreKey(String resource) {
+ return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+ }
+
+ private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
+ return accessor.keyBuilder().idealStates(resource);
+ }
+
+ private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
+ return accessor.keyBuilder().resourceConfig(resource);
+ }
+
+ private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
+ for (String pName : pNames) {
+ pIds.add(pId(pName));
+ }
+ }
+
+ private static ResourceAssignment emptyAssignment(ResourceId resourceId) {
+ return new ResourceAssignment(resourceId);
+ }
+
+ private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+ Iterable<Integer> pIds) {
+ for (Integer pId : pIds) {
+ TaskPartitionState state = ctx.getPartitionState(pId);
+ if (state == TaskPartitionState.COMPLETED) {
+ set.add(pId);
+ }
+ }
+ }
+
+ /**
+ * Returns the set of all partition ids for a task.
+ * <p/>
+ * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+ * use the list of all partition ids from the target resource.
+ */
+ private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
+ Set<Integer> taskPartitions = new HashSet<Integer>();
+ if (taskCfg.getTargetPartitions() != null) {
+ for (Integer pId : taskCfg.getTargetPartitions()) {
+ taskPartitions.add(pId);
+ }
+ } else {
+ for (String pName : tgtResourceIs.getPartitionSet()) {
+ taskPartitions.add(pId(pName));
+ }
+ }
+
+ return taskPartitions;
+ }
+
+ private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+ Set<Integer> excluded, int n) {
+ List<Integer> result = new ArrayList<Integer>(n);
+ for (Integer pId : candidatePartitions) {
+ if (result.size() >= n) {
+ break;
+ }
+
+ if (!excluded.contains(pId)) {
+ result.add(pId);
+ }
+ }
+
+ return result;
+ }
+
+ private static void markPartitionCompleted(TaskContext ctx, int pId) {
+ ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+ ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+ ctx.incrementNumAttempts(pId);
+ }
+
+ private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
+ ctx.setPartitionState(pId, state);
+ ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+ ctx.incrementNumAttempts(pId);
+ }
+
+ /**
+ * Get partition assignments for the target resource, but only for the partitions of interest.
+ * @param currStateOutput The current state of the instances in the cluster.
+ * @param instanceList The set of instances.
+ * @param tgtIs The ideal state of the target resource.
+ * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+ * do not need to
+ * be in any specific state to be considered.
+ * @param includeSet The set of partitions to consider.
+ * @return A map of instance vs set of partition ids assigned to that instance.
+ */
+ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+ ResourceCurrentState currStateOutput, Iterable<ParticipantId> instanceList, IdealState tgtIs,
+ Set<String> tgtStates, Set<Integer> includeSet) {
+ Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+ for (ParticipantId instance : instanceList) {
+ result.put(instance.stringify(), new TreeSet<Integer>());
+ }
+
+ for (String pName : tgtIs.getPartitionSet()) {
+ int pId = pId(pName);
+ if (includeSet.contains(pId)) {
+ for (ParticipantId instance : instanceList) {
+ State s =
+ currStateOutput.getCurrentState(ResourceId.from(tgtIs.getResourceName()),
+ PartitionId.from(pName), instance);
+ String state = (s == null ? null : s.toString());
+ if (tgtStates == null || tgtStates.contains(state)) {
+ result.get(instance).add(pId);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Return the assignment of task partitions per instance.
+ */
+ private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+ Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+ Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+ for (ParticipantId instance : instanceList) {
+ result.put(instance.stringify(), new TreeSet<Integer>());
+ }
+
+ for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
+ int pId = pId(partitionId.stringify());
+ if (includeSet.contains(pId)) {
+ Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+ for (ParticipantId instance : replicaMap.keySet()) {
+ SortedSet<Integer> pList = result.get(instance.stringify());
+ if (pList != null) {
+ pList.add(pId);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Computes the partition name given the resource name and partition id.
+ */
+ private static String pName(String resource, int pId) {
+ return resource + "_" + pId;
+ }
+
+ /**
+ * Extracts the partition id from the given partition name.
+ */
+ private static int pId(String pName) {
+ String[] tokens = pName.split("_");
+ return Integer.valueOf(tokens[tokens.length - 1]);
+ }
+
+ /**
+ * An (instance, state) pair.
+ */
+ private static class PartitionAssignment {
+ private final String _instance;
+ private final String _state;
+
+ private PartitionAssignment(String instance, String state) {
+ _instance = instance;
+ _state = state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
new file mode 100644
index 0000000..95b8d72
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -0,0 +1,70 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * The result of a task execution.
+ */
+public class TaskResult {
+ /**
+ * An enumeration of status codes.
+ */
+ public enum Status {
+ /** The task completed normally. */
+ COMPLETED,
+ /**
+ * The task was cancelled externally, i.e. {@link org.apache.helix.task.Task#cancel()} was
+ * called.
+ */
+ CANCELED,
+ /** The task encountered an error from which it could not recover. */
+ ERROR
+ }
+
+ private final Status _status;
+ private final String _info;
+
+ /**
+ * Constructs a new {@link TaskResult}.
+ * @param status The status code.
+ * @param info Information that can be interpreted by the {@link Task} implementation that
+ * constructed this object.
+ * May encode progress or check point information that can be used by the task to resume
+ * from where it
+ * left off in a previous execution.
+ */
+ public TaskResult(Status status, String info) {
+ _status = status;
+ _info = info;
+ }
+
+ public Status getStatus() {
+ return _status;
+ }
+
+ public String getInfo() {
+ return _info;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskResult{" + "_status=" + _status + ", _info='" + _info + '\'' + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
new file mode 100644
index 0000000..e7a9abb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -0,0 +1,174 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+/**
+ * A wrapping {@link Runnable} used to manage the life-cycle of a user-defined {@link Task}
+ * implementation.
+ */
+public class TaskRunner implements Runnable {
+ private static final Logger LOG = Logger.getLogger(TaskRunner.class);
+
+ private final StateModel _taskStateModel;
+ private final HelixManager _manager;
+ private final String _taskName;
+ private final String _taskPartition;
+ private final String _sessionId;
+ private final String _instance;
+ // Synchronization object used to signal that the task has been scheduled on a thread.
+ private final Object _startedSync = new Object();
+ // Synchronization object used to signal that the task has finished.
+ private final Object _doneSync = new Object();
+ private final Task _task;
+ // Stores the result of the task once it has finished.
+ private volatile TaskResult _result = null;
+ // If true, indicates that the task has started.
+ private volatile boolean _started = false;
+ // If true, indicates that the task was canceled due to a task timeout.
+ private volatile boolean _timeout = false;
+ // If true, indicates that the task has finished.
+ private volatile boolean _done = false;
+
+ public TaskRunner(StateModel taskStateModel, Task task, String taskName, String taskPartition,
+ String instance,
+ HelixManager manager, String sessionId) {
+ _taskStateModel = taskStateModel;
+ _task = task;
+ _taskName = taskName;
+ _taskPartition = taskPartition;
+ _instance = instance;
+ _manager = manager;
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ signalStarted();
+ _result = _task.run();
+
+ switch (_result.getStatus()) {
+ case COMPLETED:
+ requestStateTransition(TaskPartitionState.COMPLETED);
+ break;
+ case CANCELED:
+ if (_timeout) {
+ requestStateTransition(TaskPartitionState.TIMED_OUT);
+ }
+ // Else the state transition to CANCELED was initiated by the controller.
+ break;
+ case ERROR:
+ requestStateTransition(TaskPartitionState.TASK_ERROR);
+ break;
+ default:
+ throw new AssertionError("Unknown result type.");
+ }
+ } catch (Exception e) {
+ requestStateTransition(TaskPartitionState.TASK_ERROR);
+ } finally {
+ synchronized (_doneSync) {
+ _done = true;
+ _doneSync.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Signals the task to cancel itself.
+ */
+ public void timeout() {
+ _timeout = true;
+ cancel();
+ }
+
+ /**
+ * Signals the task to cancel itself.
+ */
+ public void cancel() {
+ _task.cancel();
+ }
+
+ /**
+ * Waits uninterruptibly until the task has started.
+ */
+ public void waitTillStarted() {
+ synchronized (_startedSync) {
+ while (!_started) {
+ try {
+ _startedSync.wait();
+ } catch (InterruptedException e) {
+ LOG.warn(
+ String.format("Interrupted while waiting for task %s to start.", _taskPartition), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Waits uninterruptibly until the task has finished, either normally or due to an
+ * error/cancellation..
+ */
+ public TaskResult waitTillDone() {
+ synchronized (_doneSync) {
+ while (!_done) {
+ try {
+ _doneSync.wait();
+ } catch (InterruptedException e) {
+ LOG.warn(
+ String.format("Interrupted while waiting for task %s to complete.", _taskPartition),
+ e);
+ }
+ }
+ }
+ return _result;
+ }
+
+ /**
+ * Signals any threads waiting for this task to start.
+ */
+ private void signalStarted() {
+ synchronized (_startedSync) {
+ _started = true;
+ _startedSync.notifyAll();
+ }
+ }
+
+ /**
+ * Requests the controller for a state transition.
+ * @param state The state transition that is being requested.
+ */
+ private void requestStateTransition(TaskPartitionState state) {
+ boolean success =
+ TaskUtil.setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId,
+ _taskName, _taskPartition, state);
+ if (success) {
+ _taskStateModel.setRequestedState(state.name());
+ } else {
+ LOG.error(String
+ .format(
+ "Failed to set the requested state to %s for instance %s, session id %s, task partition %s.",
+ state, _instance, _sessionId, _taskPartition));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
new file mode 100644
index 0000000..2cc6d6c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Enumeration of current task states. This value is stored in the rebalancer context.
+ */
+public enum TaskState {
+ /**
+ * The task is in progress.
+ */
+ IN_PROGRESS,
+ /**
+ * The task has been stopped. It may be resumed later.
+ */
+ STOPPED,
+ /**
+ * The task has failed. It cannot be resumed.
+ */
+ FAILED,
+ /**
+ * All the task partitions have completed normally.
+ */
+ COMPLETED
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
new file mode 100644
index 0000000..2a6d003
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -0,0 +1,240 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * task state model
+ */
+@StateModelInfo(states = {
+ "INIT", "RUNNING", "STOPPED", "COMPLETED", "TIMED_OUT", "TASK_ERROR", "DROPPED"
+}, initialState = "INIT")
+public class TaskStateModel extends StateModel {
+ private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
+ private final HelixManager _manager;
+ private final ExecutorService _taskExecutor;
+ private final Map<String, TaskFactory> _taskFactoryRegistry;
+ private final Timer _timer = new Timer("TaskStateModel time out daemon", true);
+ private TaskRunner _taskRunner;
+
+ public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+ _manager = manager;
+ _taskFactoryRegistry = taskFactoryRegistry;
+ _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TaskStateModel-thread-pool");
+ }
+ });
+ }
+
+ @Transition(to = "RUNNING", from = "INIT")
+ public void onBecomeRunningFromInit(Message msg, NotificationContext context) {
+ startTask(msg, msg.getPartitionName());
+ }
+
+ @Transition(to = "STOPPED", from = "RUNNING")
+ public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ _taskRunner.cancel();
+ TaskResult r = _taskRunner.waitTillDone();
+ LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+
+ return r.getInfo();
+ }
+
+ @Transition(to = "COMPLETED", from = "RUNNING")
+ public void onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ TaskResult r = _taskRunner.waitTillDone();
+ if (r.getStatus() != TaskResult.Status.COMPLETED) {
+ throw new IllegalStateException(String.format(
+ "Partition %s received a state transition to %s but the result status code is %s.",
+ msg.getPartitionName(), msg.getToState(), r.getStatus()));
+ }
+ }
+
+ @Transition(to = "TIMED_OUT", from = "RUNNING")
+ public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ TaskResult r = _taskRunner.waitTillDone();
+ if (r.getStatus() != TaskResult.Status.CANCELED) {
+ throw new IllegalStateException(String.format(
+ "Partition %s received a state transition to %s but the result status code is %s.",
+ msg.getPartitionName(), msg.getToState(), r.getStatus()));
+ }
+
+ return r.getInfo();
+ }
+
+ @Transition(to = "TASK_ERROR", from = "RUNNING")
+ public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ TaskResult r = _taskRunner.waitTillDone();
+ if (r.getStatus() != TaskResult.Status.ERROR) {
+ throw new IllegalStateException(String.format(
+ "Partition %s received a state transition to %s but the result status code is %s.",
+ msg.getPartitionName(), msg.getToState(), r.getStatus()));
+ }
+
+ return r.getInfo();
+ }
+
+ @Transition(to = "RUNNING", from = "STOPPED")
+ public void onBecomeRunningFromStopped(Message msg, NotificationContext context) {
+ startTask(msg, msg.getPartitionName());
+ }
+
+ @Transition(to = "DROPPED", from = "INIT")
+ public void onBecomeDroppedFromInit(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "DROPPED", from = "RUNNING")
+ public void onBecomeDroppedFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ _taskRunner.cancel();
+ TaskResult r = _taskRunner.waitTillDone();
+ LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+ _taskRunner = null;
+ }
+
+ @Transition(to = "DROPPED", from = "COMPLETED")
+ public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "DROPPED", from = "STOPPED")
+ public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "DROPPED", from = "TIMED_OUT")
+ public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "DROPPED", from = "TASK_ERROR")
+ public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "INIT", from = "RUNNING")
+ public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
+ String taskPartition = msg.getPartitionName();
+ if (_taskRunner == null) {
+ throw new IllegalStateException(String.format(
+ "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ }
+
+ _taskRunner.cancel();
+ TaskResult r = _taskRunner.waitTillDone();
+ LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+ _taskRunner = null;
+ }
+
+ @Transition(to = "INIT", from = "COMPLETED")
+ public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "INIT", from = "STOPPED")
+ public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "INIT", from = "TIMED_OUT")
+ public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Transition(to = "INIT", from = "TASK_ERROR")
+ public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
+ _taskRunner = null;
+ }
+
+ @Override
+ public void reset() {
+ if (_taskRunner != null) {
+ _taskRunner.cancel();
+ }
+ }
+
+ private void startTask(Message msg, String taskPartition) {
+ TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
+ TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
+ Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+
+ _taskRunner =
+ new TaskRunner(this, task, msg.getResourceName(), taskPartition, msg.getTgtName(),
+ _manager,
+ msg.getTgtSessionId());
+ _taskExecutor.submit(_taskRunner);
+ _taskRunner.waitTillStarted();
+
+ // Set up a timer to cancel the task when its time out expires.
+ _timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (_taskRunner != null) {
+ _taskRunner.timeout();
+ }
+ }
+ }, cfg.getTimeoutPerPartition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
new file mode 100644
index 0000000..369ac22
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * Factory class for {@link TaskStateModel}.
+ */
+public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
+ private final HelixManager _manager;
+ private final Map<String, TaskFactory> _taskFactoryRegistry;
+
+ public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+ _manager = manager;
+ _taskFactoryRegistry = taskFactoryRegistry;
+ }
+
+ @Override
+ public TaskStateModel createNewStateModel(String partitionName) {
+ return new TaskStateModel(_manager, _taskFactoryRegistry);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
new file mode 100644
index 0000000..a9428c6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -0,0 +1,179 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.log4j.Logger;
+
+/**
+ * Static utility methods.
+ */
+public class TaskUtil {
+ private static final Logger LOG = Logger.getLogger(TaskUtil.class);
+
+ enum TaskUtilEnum {
+ CONTEXT_NODE("Context"),
+ PREV_RA_NODE("PreviousResourceAssignment");
+
+ final String _value;
+
+ private TaskUtilEnum(String value) {
+ _value = value;
+ }
+
+ public String value() {
+ return _value;
+ }
+ }
+
+ /**
+ * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+ * @param manager HelixManager object used to connect to Helix.
+ * @param taskResource The name of the task resource.
+ * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+ * otherwise.
+ */
+ public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
+ Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+ TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
+
+ return b.build();
+ }
+
+ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
+ Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+ WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
+ return b.build();
+ }
+
+ public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
+ String sessionId, String resource, String partition, TaskPartitionState state) {
+ LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
+ partition));
+ try {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+ CurrentState currStateDelta = new CurrentState(resource);
+ currStateDelta.setRequestedState(PartitionId.from(partition), State.from(state.name()));
+
+ return accessor.updateProperty(key, currStateDelta);
+ } catch (Exception e) {
+ LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
+ state, partition), e);
+ return false;
+ }
+ }
+
+ public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
+ return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+ .forCluster(clusterName).forResource(resource).build();
+ }
+
+ public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
+ String resourceName) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
+ TaskUtilEnum.PREV_RA_NODE.value()),
+ null, AccessOption.PERSISTENT);
+ return r != null ? new ResourceAssignment(r) : null;
+ }
+
+ public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
+ ResourceAssignment ra) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
+ TaskUtilEnum.PREV_RA_NODE.value()),
+ ra.getRecord(), AccessOption.PERSISTENT);
+ }
+
+ public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
+ TaskUtilEnum.CONTEXT_NODE.value()),
+ null, AccessOption.PERSISTENT);
+ return r != null ? new TaskContext(r) : null;
+ }
+
+ public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
+ TaskUtilEnum.CONTEXT_NODE.value()),
+ ctx.getRecord(), AccessOption.PERSISTENT);
+ }
+
+ public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+ TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
+ return r != null ? new WorkflowContext(r) : null;
+ }
+
+ public static void setWorkflowContext(HelixManager manager, String workflowResource,
+ WorkflowContext ctx) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+ TaskUtilEnum.CONTEXT_NODE.value()),
+ ctx.getRecord(), AccessOption.PERSISTENT);
+ }
+
+ public static String getNamespacedTaskName(String singleTaskWorkflow) {
+ return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+ }
+
+ public static String getNamespacedTaskName(String workflowResource, String taskName) {
+ return workflowResource + "_" + taskName;
+ }
+
+ private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
+ HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+
+ Map<String, String> taskCfg = new HashMap<String, String>();
+ List<String> cfgKeys = configAccessor.getKeys(scope);
+ if (cfgKeys == null || cfgKeys.isEmpty()) {
+ return null;
+ }
+
+ for (String cfgKey : cfgKeys) {
+ taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
+ }
+
+ return taskCfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
new file mode 100644
index 0000000..c5c005b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -0,0 +1,248 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.helix.task.beans.WorkflowBean;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+/**
+ * Houses a task dag and config set to fully describe a task workflow
+ */
+public class Workflow {
+ /** Default workflow name, useful constant for single-node workflows */
+ public static enum WorkflowEnum {
+ UNSPECIFIED;
+ }
+
+ /** Workflow name */
+ private final String _name;
+
+ /** Holds workflow-level configurations */
+ private final WorkflowConfig _workflowConfig;
+
+ /** Contains the per-task configurations for all tasks specified in the provided dag */
+ private final Map<String, Map<String, String>> _taskConfigs;
+
+ /** Constructs and validates a workflow against a provided dag and config set */
+ private Workflow(String name, WorkflowConfig workflowConfig,
+ Map<String, Map<String, String>> taskConfigs) {
+ _name = name;
+ _workflowConfig = workflowConfig;
+ _taskConfigs = taskConfigs;
+
+ validate();
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public Map<String, Map<String, String>> getTaskConfigs() {
+ return _taskConfigs;
+ }
+
+ public Map<String, String> getResourceConfigMap() throws Exception {
+ Map<String, String> cfgMap = new HashMap<String, String>();
+ cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+ cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
+ cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
+
+ return cfgMap;
+ }
+
+ /**
+ * Parses the YAML description from a file into a {@link Workflow} object.
+ * @param file An abstract path name to the file containing the workflow description.
+ * @return A {@link Workflow} object.
+ * @throws Exception
+ */
+ public static Workflow parse(File file) throws Exception {
+ BufferedReader br = new BufferedReader(new FileReader(file));
+ return parse(br);
+ }
+
+ /**
+ * Parses a YAML description of the workflow into a {@link Workflow} object. The YAML string is of
+ * the following
+ * form:
+ * <p/>
+ *
+ * <pre>
+ * name: MyFlow
+ * tasks:
+ * - name : TaskA
+ * command : SomeTask
+ * ...
+ * - name : TaskB
+ * parents : [TaskA]
+ * command : SomeOtherTask
+ * ...
+ * - name : TaskC
+ * command : AnotherTask
+ * ...
+ * - name : TaskD
+ * parents : [TaskB, TaskC]
+ * command : AnotherTask
+ * ...
+ * </pre>
+ * @param yaml A YAML string of the above form
+ * @return A {@link Workflow} object.
+ */
+ public static Workflow parse(String yaml) throws Exception {
+ return parse(new StringReader(yaml));
+ }
+
+ /** Helper function to parse workflow from a generic {@link Reader} */
+ private static Workflow parse(Reader reader) throws Exception {
+ Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
+ WorkflowBean wf = (WorkflowBean) yaml.load(reader);
+ Builder builder = new Builder(wf.name);
+
+ for (TaskBean task : wf.tasks) {
+ if (task.name == null) {
+ throw new IllegalArgumentException("A task must have a name.");
+ }
+
+ if (task.parents != null) {
+ for (String parent : task.parents) {
+ builder.addParentChildDependency(parent, task.name);
+ }
+ }
+
+ builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
+ builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
+ if (task.commandConfig != null) {
+ builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+ }
+ builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
+ if (task.targetPartitionStates != null) {
+ builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES,
+ Joiner.on(",").join(task.targetPartitionStates));
+ }
+ if (task.targetPartitions != null) {
+ builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS,
+ Joiner.on(",").join(task.targetPartitions));
+ }
+ builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+ String.valueOf(task.maxAttemptsPerPartition));
+ builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ String.valueOf(task.numConcurrentTasksPerInstance));
+ builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION,
+ String.valueOf(task.timeoutPerPartition));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Verifies that all nodes in provided dag have accompanying config and vice-versa.
+ * Also checks dag for cycles and unreachable nodes, and ensures configs are valid.
+ */
+ public void validate() {
+ // validate dag and configs
+ if (!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes())) {
+ throw new IllegalArgumentException("Nodes specified in DAG missing from config");
+ } else if (!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet())) {
+ throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+ }
+
+ _workflowConfig.getTaskDag().validate();
+
+ for (String node : _taskConfigs.keySet()) {
+ buildConfig(node);
+ }
+ }
+
+ /** Builds a TaskConfig from config map. Useful for validating configs */
+ private TaskConfig buildConfig(String task) {
+ return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+ }
+
+ /** Build a workflow incrementally from dependencies and single configs, validate at build time */
+ public static class Builder {
+ private final String _name;
+ private final TaskDag _dag;
+ private final Map<String, Map<String, String>> _taskConfigs;
+ private long _expiry;
+
+ public Builder(String name) {
+ _name = name;
+ _dag = new TaskDag();
+ _taskConfigs = new TreeMap<String, Map<String, String>>();
+ _expiry = -1;
+ }
+
+ public Builder addConfig(String node, String key, String val) {
+ node = namespacify(node);
+ _dag.addNode(node);
+
+ if (!_taskConfigs.containsKey(node)) {
+ _taskConfigs.put(node, new TreeMap<String, String>());
+ }
+ _taskConfigs.get(node).put(key, val);
+
+ return this;
+ }
+
+ public Builder addParentChildDependency(String parent, String child) {
+ parent = namespacify(parent);
+ child = namespacify(child);
+ _dag.addParentToChild(parent, child);
+
+ return this;
+ }
+
+ public Builder setExpiry(long expiry) {
+ _expiry = expiry;
+ return this;
+ }
+
+ public String namespacify(String task) {
+ return TaskUtil.getNamespacedTaskName(_name, task);
+ }
+
+ public Workflow build() {
+ for (String task : _taskConfigs.keySet()) {
+ // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
+ _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+ }
+
+ WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+ builder.setTaskDag(_dag);
+ builder.setTargetState(TargetState.START);
+ if (_expiry > 0) {
+ builder.setExpiry(_expiry);
+ }
+
+ return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
new file mode 100644
index 0000000..bb88be7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -0,0 +1,113 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+/**
+ * Provides a typed interface to workflow level configurations. Validates the configurations.
+ */
+public class WorkflowConfig {
+ /* Config fields */
+ public static final String DAG = "Dag";
+ public static final String TARGET_STATE = "TargetState";
+ public static final String EXPIRY = "Expiry";
+
+ /* Default values */
+ public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+
+ /* Member variables */
+ private final TaskDag _taskDag;
+ private final TargetState _targetState;
+ private final long _expiry;
+
+ private WorkflowConfig(TaskDag taskDag, TargetState targetState, long expiry) {
+ _taskDag = taskDag;
+ _targetState = targetState;
+ _expiry = expiry;
+ }
+
+ public TaskDag getTaskDag() {
+ return _taskDag;
+ }
+
+ public TargetState getTargetState() {
+ return _targetState;
+ }
+
+ public long getExpiry() {
+ return _expiry;
+ }
+
+ public static class Builder {
+ private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+ private TargetState _targetState = TargetState.START;
+ private long _expiry = DEFAULT_EXPIRY;
+
+ public Builder() {
+ // Nothing to do
+ }
+
+ public WorkflowConfig build() {
+ validate();
+
+ return new WorkflowConfig(_taskDag, _targetState, _expiry);
+ }
+
+ public Builder setTaskDag(TaskDag v) {
+ _taskDag = v;
+ return this;
+ }
+
+ public Builder setExpiry(long v) {
+ _expiry = v;
+ return this;
+ }
+
+ public Builder setTargetState(TargetState v) {
+ _targetState = v;
+ return this;
+ }
+
+ public static Builder fromMap(Map<String, String> cfg) {
+ Builder b = new Builder();
+
+ if (cfg.containsKey(EXPIRY)) {
+ b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
+ }
+ if (cfg.containsKey(DAG)) {
+ b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+ }
+ if (cfg.containsKey(TARGET_STATE)) {
+ b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
+ }
+
+ return b;
+ }
+
+ private void validate() {
+ if (_expiry < 0) {
+ throw new IllegalArgumentException(
+ String.format("%s has invalid value %s", EXPIRY, _expiry));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
new file mode 100644
index 0000000..cd30860
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
+ * property store
+ */
+public class WorkflowContext extends HelixProperty {
+
+ enum WorkflowContextEnum {
+ WORKFLOW_STATE("STATE"),
+ START_TIME("START_TIME"),
+ FINISH_TIME("FINISH_TIME"),
+ TASK_STATES("TASK_STATES");
+
+ final String _value;
+
+ private WorkflowContextEnum(String value) {
+ _value = value;
+ }
+
+ public String value() {
+ return _value;
+ }
+ }
+
+ public static final int UNFINISHED = -1;
+
+ public WorkflowContext(ZNRecord record) {
+ super(record);
+ }
+
+ public void setWorkflowState(TaskState s) {
+ if (_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()) == null) {
+ _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
+ } else if (!_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
+ TaskState.FAILED.name())
+ && !_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
+ TaskState.COMPLETED.name())) {
+ _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
+ }
+ }
+
+ public TaskState getWorkflowState() {
+ String s = _record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value());
+ if (s == null) {
+ return null;
+ }
+
+ return TaskState.valueOf(s);
+ }
+
+ public void setTaskState(String taskResource, TaskState s) {
+ Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+ if (states == null) {
+ states = new TreeMap<String, String>();
+ _record.setMapField(WorkflowContextEnum.TASK_STATES.value(), states);
+ }
+ states.put(taskResource, s.name());
+ }
+
+ public TaskState getTaskState(String taskResource) {
+ Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+ if (states == null) {
+ return null;
+ }
+
+ String s = states.get(taskResource);
+ if (s == null) {
+ return null;
+ }
+
+ return TaskState.valueOf(s);
+ }
+
+ public void setStartTime(long t) {
+ _record.setSimpleField(WorkflowContextEnum.START_TIME.value(), String.valueOf(t));
+ }
+
+ public long getStartTime() {
+ String tStr = _record.getSimpleField(WorkflowContextEnum.START_TIME.value());
+ if (tStr == null) {
+ return -1;
+ }
+
+ return Long.parseLong(tStr);
+ }
+
+ public void setFinishTime(long t) {
+ _record.setSimpleField(WorkflowContextEnum.FINISH_TIME.value(), String.valueOf(t));
+ }
+
+ public long getFinishTime() {
+ String tStr = _record.getSimpleField(WorkflowContextEnum.FINISH_TIME.value());
+ if (tStr == null) {
+ return UNFINISHED;
+ }
+
+ return Long.parseLong(tStr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
new file mode 100644
index 0000000..9481c6e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -0,0 +1,40 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.task.TaskConfig;
+
+/**
+ * Bean class used for parsing task definitions from YAML.
+ */
+public class TaskBean {
+ public String name;
+ public List<String> parents;
+ public String targetResource;
+ public List<String> targetPartitionStates;
+ public List<Integer> targetPartitions;
+ public String command;
+ public Map<String, Object> commandConfig;
+ public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
+ public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
new file mode 100644
index 0000000..4e64692
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -0,0 +1,31 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+/**
+ * Bean class used for parsing workflow definitions from YAML.
+ */
+public class WorkflowBean {
+ public String name;
+ public String expiry;
+ public List<TaskBean> tasks;
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 1d02275..0239312 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -153,7 +153,6 @@ public class ClusterSetup {
public void addCluster(String clusterName, boolean overwritePrevious) {
_admin.addCluster(clusterName, overwritePrevious);
- // StateModelConfigGenerator generator = new StateModelConfigGenerator();
addStateModelDef(clusterName, "MasterSlave",
new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
addStateModelDef(clusterName, "LeaderStandby", new StateModelDefinition(
@@ -164,6 +163,9 @@ public class ClusterSetup {
StateModelConfigGenerator.generateConfigForOnlineOffline()));
addStateModelDef(clusterName, "ScheduledTask", new StateModelDefinition(
StateModelConfigGenerator.generateConfigForScheduledTaskQueue()));
+
+ addStateModelDef(clusterName, "Task",
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel()));
}
public void activateCluster(String clusterName, String grandCluster, boolean enable) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
index 8127626..e970f6f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -31,6 +31,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
import org.apache.helix.model.Transition;
import org.apache.helix.model.builder.StateTransitionTableBuilder;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
// TODO refactor to use StateModelDefinition.Builder
public class StateModelConfigGenerator {
@@ -349,4 +351,128 @@ public class StateModelConfigGenerator {
stateTransitionPriorityList);
return record;
}
+
+ public static ZNRecord generateConfigForTaskStateModel() {
+ ZNRecord record = new ZNRecord(TaskConstants.STATE_MODEL_NAME);
+
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ TaskPartitionState.INIT.name());
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add(TaskPartitionState.INIT.name());
+ statePriorityList.add(TaskPartitionState.RUNNING.name());
+ statePriorityList.add(TaskPartitionState.STOPPED.name());
+ statePriorityList.add(TaskPartitionState.COMPLETED.name());
+ statePriorityList.add(TaskPartitionState.TIMED_OUT.name());
+ statePriorityList.add(TaskPartitionState.TASK_ERROR.name());
+ statePriorityList.add(TaskPartitionState.DROPPED.name());
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList) {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+
+ List<String> states = new ArrayList<String>();
+ states.add(TaskPartitionState.INIT.name());
+ states.add(TaskPartitionState.RUNNING.name());
+ states.add(TaskPartitionState.STOPPED.name());
+ states.add(TaskPartitionState.COMPLETED.name());
+ states.add(TaskPartitionState.TIMED_OUT.name());
+ states.add(TaskPartitionState.TASK_ERROR.name());
+ states.add(TaskPartitionState.DROPPED.name());
+
+ List<Transition> transitions = new ArrayList<Transition>();
+ transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.RUNNING
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.STOPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING
+ .name()));
+
+ // All states have a transition to DROPPED.
+ transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED
+ .name()));
+
+ // All states, except DROPPED, have a transition to INIT.
+ transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT
+ .name()));
+ transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT
+ .name()));
+
+ StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
+ Map<String, Map<String, String>> next = builder.buildTransitionTable(states, transitions);
+
+ for (String state : statePriorityList) {
+ String key = state + ".next";
+ record.setMapField(key, next.get(state));
+ }
+
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(),
+ TaskPartitionState.RUNNING.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.STOPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.COMPLETED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.TIMED_OUT.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.TASK_ERROR.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+ TaskPartitionState.RUNNING.name()));
+
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(),
+ TaskPartitionState.DROPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.DROPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(),
+ TaskPartitionState.DROPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+ TaskPartitionState.DROPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
+ TaskPartitionState.DROPPED.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
+ TaskPartitionState.DROPPED.name()));
+
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+ TaskPartitionState.INIT.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(),
+ TaskPartitionState.INIT.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+ TaskPartitionState.INIT.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
+ TaskPartitionState.INIT.name()));
+ stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
+ TaskPartitionState.INIT.name()));
+
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+
+ return record;
+ }
+
}