You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/12/10 08:04:54 UTC
[2/3] helix git commit: [HELIX-617] Job IdealState is generated even
the job is not running and not removed when it is completed.
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 3842b66..1526883 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,679 +19,56 @@ package org.apache.helix.task;
* under the License.
*/
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
-import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
- * Custom rebalancer implementation for the {@code Task} state model.
+ * Abstract rebalancer class for the {@code Task} state model.
*/
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
- // Management of already-scheduled rebalances across jobs
- private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
- private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
- .newSingleThreadScheduledExecutor();
-
// For connection management
- private HelixManager _manager;
-
- /**
- * Get all the partitions that should be created by this task
- * @param jobCfg the task configuration
- * @param jobCtx the task context
- * @param workflowCfg the workflow configuration
- * @param workflowCtx the workflow context
- * @param cache cluster snapshot
- * @return set of partition numbers
- */
- public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+ protected HelixManager _manager;
+ protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer();
- /**
- * Compute an assignment of tasks to instances
- * @param currStateOutput the current state of the instances
- * @param prevAssignment the previous task partition assignment
- * @param instances the instances
- * @param jobCfg the task configuration
- * @param jobContext the task context
- * @param workflowCfg the workflow configuration
- * @param workflowCtx the workflow context
- * @param partitionSet the partitions to assign
- * @param cache cluster snapshot
- * @return map of instances to set of partition numbers
- */
- public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
- CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
- Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
- ClusterDataCache cache);
-
- @Override
- public void init(HelixManager manager) {
+ @Override public void init(HelixManager manager) {
_manager = manager;
}
- @Override
- public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
- IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
- final String resourceName = resource.getResourceName();
- LOG.debug("Computer Best Partition for resource: " + resourceName);
-
- // Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
- if (jobCfg == null) {
- LOG.debug("Job configuration is NULL for " + resourceName);
- return emptyAssignment(resourceName, currStateOutput);
- }
- String workflowResource = jobCfg.getWorkflow();
-
- // Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
- if (workflowCfg == null) {
- LOG.debug("Workflow configuration is NULL for " + resourceName);
- return emptyAssignment(resourceName, currStateOutput);
- }
- WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
-
- // Initialize workflow context if needed
- if (workflowCtx == null) {
- workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
- workflowCtx.setStartTime(System.currentTimeMillis());
- LOG.info("Workflow context for " + resourceName + " created!");
- }
-
- // check ancestor job status
- int notStartedCount = 0;
- int inCompleteCount = 0;
- for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
- TaskState jobState = workflowCtx.getJobState(ancestor);
- if (jobState == null || jobState == TaskState.NOT_STARTED) {
- ++notStartedCount;
- } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
- ++inCompleteCount;
- }
- }
-
- if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
- LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // Clean up if workflow marked for deletion
- TargetState targetState = workflowCfg.getTargetState();
- if (targetState == TargetState.DELETE) {
- LOG.info(
- "Workflow is marked as deleted " + workflowResource
- + " cleaning up the workflow context.");
- cleanup(_manager, resourceName, workflowCfg, workflowResource);
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // Check if this workflow has been finished past its expiry.
- if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
- && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
- LOG.info("Workflow " + workflowResource
- + " is completed and passed expiry time, cleaning up the workflow context.");
- markForDeletion(_manager, workflowResource);
- cleanup(_manager, resourceName, workflowCfg, workflowResource);
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // Fetch any existing context information from the property store.
- JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
- if (jobCtx == null) {
- jobCtx = new JobContext(new ZNRecord("TaskContext"));
- jobCtx.setStartTime(System.currentTimeMillis());
- }
-
- // Check for expired jobs for non-terminable workflows
- long jobFinishTime = jobCtx.getFinishTime();
- if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
- && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
- LOG.info("Job " + resourceName
- + " is completed and passed expiry time, cleaning up the job context.");
- cleanup(_manager, resourceName, workflowCfg, workflowResource);
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // The job is already in a final state (completed/failed).
- if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
- || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
- LOG.debug("Job " + resourceName + " is failed or already completed.");
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // Check for readiness, and stop processing if it's not ready
- boolean isReady =
- scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
- if (!isReady) {
- LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
- return emptyAssignment(resourceName, currStateOutput);
- }
-
- // Grab the old assignment, or an empty one if it doesn't exist
- ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
- if (prevAssignment == null) {
- prevAssignment = new ResourceAssignment(resourceName);
- }
-
- // Will contain the list of partitions that must be explicitly dropped from the ideal state that
- // is stored in zk.
- // Fetch the previous resource assignment from the property store. This is required because of
- // HELIX-230.
- Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
- ResourceAssignment newAssignment =
- computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
- .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
- clusterData);
-
- if (!partitionsToDrop.isEmpty()) {
- for (Integer pId : partitionsToDrop) {
- taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
- }
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
- accessor.setProperty(propertyKey, taskIs);
- }
-
- // Update rebalancer context, previous ideal state.
- TaskUtil.setJobContext(_manager, resourceName, jobCtx);
- TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
- TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
- LOG.debug("Job " + resourceName + " new assignment " + Arrays
- .toString(newAssignment.getMappedPartitions().toArray()));
-
- return newAssignment;
- }
-
- private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
- WorkflowConfig workflowCfg) {
-
- Set<String> ret = new HashSet<String>();
-
- for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
- if (jobName.equals(currentJobName)) {
- continue;
- }
-
- JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
- if (jobContext == null) {
- continue;
- }
- for (int partition : jobContext.getPartitionSet()) {
- TaskPartitionState partitionState = jobContext.getPartitionState(partition);
- if (partitionState == TaskPartitionState.INIT ||
- partitionState == TaskPartitionState.RUNNING) {
- ret.add(jobContext.getAssignedParticipant(partition));
- }
- }
- }
-
- return ret;
- }
-
- private ResourceAssignment computeResourceMapping(String jobResource,
- WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
- Collection<String> liveInstances, CurrentStateOutput currStateOutput,
- WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
- ClusterDataCache cache) {
- TargetState jobTgtState = workflowConfig.getTargetState();
-
- // Update running status in workflow context
- if (jobTgtState == TargetState.STOP) {
- workflowCtx.setJobState(jobResource, TaskState.STOPPED);
- // Workflow has been stopped if all jobs are stopped
- if (isWorkflowStopped(workflowCtx, workflowConfig)) {
- workflowCtx.setWorkflowState(TaskState.STOPPED);
- }
- } else {
- workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
- // Workflow is in progress if any task is in progress
- workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
- }
-
- // Used to keep track of tasks that have already been assigned to instances.
- Set<Integer> assignedPartitions = new HashSet<Integer>();
-
- // Used to keep track of tasks that have failed, but whose failure is acceptable
- Set<Integer> skippedPartitions = new HashSet<Integer>();
-
- // Keeps a mapping of (partition) -> (instance, state)
- Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
- Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
-
- // Process all the current assignments of tasks.
- Set<Integer> allPartitions =
- getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
- Map<String, SortedSet<Integer>> taskAssignments =
- getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
- long currentTime = System.currentTimeMillis();
- for (String instance : taskAssignments.keySet()) {
- if (excludedInstances.contains(instance)) {
- continue;
- }
-
- Set<Integer> pSet = taskAssignments.get(instance);
- // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
- // TASK_ERROR, ERROR.
- Set<Integer> donePartitions = new TreeSet<Integer>();
- for (int pId : pSet) {
- final String pName = pName(jobResource, pId);
-
- // Check for pending state transitions on this (partition, instance).
- Message pendingMessage =
- currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
- if (pendingMessage != null) {
- // There is a pending state transition for this (partition, instance). Just copy forward
- // the state assignment from the previous ideal state.
- Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
- if (stateMap != null) {
- String prevState = stateMap.get(instance);
- paMap.put(pId, new PartitionAssignment(instance, prevState));
- assignedPartitions.add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String
- .format(
- "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
- pName, instance, prevState));
- }
- }
-
- continue;
- }
-
- TaskPartitionState currState =
- TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
- pName), instance));
- jobCtx.setPartitionState(pId, currState);
-
- // Process any requested state transitions.
- String requestedStateStr =
- currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
- if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
- TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
- if (requestedState.equals(currState)) {
- LOG.warn(String.format(
- "Requested state %s is the same as the current state for instance %s.",
- requestedState, instance));
- }
-
- paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
- assignedPartitions.add(pId);
- LOG.debug(String.format(
- "Instance %s requested a state transition to %s for partition %s.", instance,
- requestedState, pName));
- continue;
- }
-
- switch (currState) {
- case RUNNING:
- case STOPPED: {
- TaskPartitionState nextState;
- if (jobTgtState == TargetState.START) {
- nextState = TaskPartitionState.RUNNING;
- } else {
- nextState = TaskPartitionState.STOPPED;
- }
-
- paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
- assignedPartitions.add(pId);
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- nextState, instance));
- }
- break;
- case COMPLETED: {
- // The task has completed on this partition. Mark as such in the context object.
- donePartitions.add(pId);
- LOG.debug(String
- .format(
- "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
- pName, currState));
- partitionsToDropFromIs.add(pId);
- markPartitionCompleted(jobCtx, pId);
- }
- break;
- case TIMED_OUT:
- case TASK_ERROR:
- case ERROR: {
- donePartitions.add(pId); // The task may be rescheduled on a different instance.
- LOG.debug(String.format(
- "Task partition %s has error state %s. Marking as such in rebalancer context.",
- pName, currState));
- markPartitionError(jobCtx, pId, currState, true);
- // The error policy is to fail the task as soon a single partition fails for a specified
- // maximum number of attempts.
- if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
- // If the user does not require this task to succeed in order for the job to succeed,
- // then we don't have to fail the job right now
- boolean successOptional = false;
- String taskId = jobCtx.getTaskIdForPartition(pId);
- if (taskId != null) {
- TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
- if (taskConfig != null) {
- successOptional = taskConfig.isSuccessOptional();
- }
- }
-
- // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
- // to fail the job immediately
- if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
- successOptional = true;
- }
-
- if (!successOptional) {
- long finishTime = currentTime;
- workflowCtx.setJobState(jobResource, TaskState.FAILED);
- if (workflowConfig.isTerminable()) {
- workflowCtx.setWorkflowState(TaskState.FAILED);
- workflowCtx.setFinishTime(finishTime);
- }
- jobCtx.setFinishTime(finishTime);
- markAllPartitionsError(jobCtx, currState, false);
- addAllPartitions(allPartitions, partitionsToDropFromIs);
- return emptyAssignment(jobResource, currStateOutput);
- } else {
- skippedPartitions.add(pId);
- partitionsToDropFromIs.add(pId);
- }
- } else {
- // Mark the task to be started at some later time (if enabled)
- markPartitionDelayed(jobCfg, jobCtx, pId);
- }
- }
- break;
- case INIT:
- case DROPPED: {
- // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
- donePartitions.add(pId);
- LOG.debug(String.format(
- "Task partition %s has state %s. It will be dropped from the current ideal state.",
- pName, currState));
- }
- break;
- default:
- throw new AssertionError("Unknown enum symbol: " + currState);
- }
- }
-
- // Remove the set of task partitions that are completed or in one of the error states.
- pSet.removeAll(donePartitions);
- }
-
- // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
- scheduleForNextTask(jobResource, jobCtx, currentTime);
-
- if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
- workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
- jobCtx.setFinishTime(currentTime);
- if (isWorkflowComplete(workflowCtx, workflowConfig)) {
- workflowCtx.setWorkflowState(TaskState.COMPLETED);
- workflowCtx.setFinishTime(currentTime);
- }
- }
-
- // Make additional task assignments if needed.
- if (jobTgtState == TargetState.START) {
- // Contains the set of task partitions that must be excluded from consideration when making
- // any new assignments.
- // This includes all completed, failed, delayed, and already assigned partitions.
- Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
- addCompletedPartitions(excludeSet, jobCtx, allPartitions);
- addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
- excludeSet.addAll(skippedPartitions);
- excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
- // Get instance->[partition, ...] mappings for the target resource.
- Map<String, SortedSet<Integer>> tgtPartitionAssignments =
- getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
- workflowConfig, workflowCtx, allPartitions, cache);
- for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
- String instance = entry.getKey();
- if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
- continue;
- }
- // Contains the set of task partitions currently assigned to the instance.
- Set<Integer> pSet = entry.getValue();
- int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
- if (numToAssign > 0) {
- List<Integer> nextPartitions =
- getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
- for (Integer pId : nextPartitions) {
- String pName = pName(jobResource, pId);
- paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
- excludeSet.add(pId);
- jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- TaskPartitionState.RUNNING, instance));
- }
- }
- }
- }
-
- // Construct a ResourceAssignment object from the map of partition assignments.
- ResourceAssignment ra = new ResourceAssignment(jobResource);
- for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
- PartitionAssignment pa = e.getValue();
- ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
- ImmutableMap.of(pa._instance, pa._state));
- }
-
- return ra;
- }
-
- /**
- * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
- * @param workflowCfg the workflow to check
- * @param workflowCtx the current workflow context
- * @param workflowResource the Helix resource associated with the workflow
- * @param jobResource a job from the workflow
- * @param cache the current snapshot of the cluster
- * @return true if ready, false if not ready
- */
- private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- String workflowResource, String jobResource, ClusterDataCache cache) {
- // Ignore non-scheduled workflows
- if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
- return true;
- }
-
- // Figure out when this should be run, and if it's ready, then just run it
- ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
- Date startTime = scheduleConfig.getStartTime();
- long currentTime = new Date().getTime();
- long delayFromStart = startTime.getTime() - currentTime;
-
- if (delayFromStart <= 0) {
- // Remove any timers that are past-time for this workflow
- Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
- if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
- LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
- SCHEDULED_TIMES.remove(workflowResource);
- }
-
- // Recurring workflows are just templates that spawn new workflows
- if (scheduleConfig.isRecurring()) {
- // Skip scheduling this workflow if it's not in a start state
- if (!workflowCfg.getTargetState().equals(TargetState.START)) {
- LOG.debug(
- "Skip scheduling since the workflow has not been started " + workflowResource);
- return false;
- }
-
- // Skip scheduling this workflow again if the previous run (if any) is still active
- String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
- if (lastScheduled != null) {
- WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
- if (lastWorkflowCtx != null
- && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
- LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
- return false;
- }
- }
-
- // Figure out how many jumps are needed, thus the time to schedule the next workflow
- // The negative of the delay is the amount of time past the start time
- long period =
- scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
- long offsetMultiplier = (-delayFromStart) / period;
- long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
- // Now clone the workflow if this clone has not yet been created
- DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
- // Now clone the workflow if this clone has not yet been created
- String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
- LOG.debug("Ready to start workflow " + newWorkflowName);
- if (!newWorkflowName.equals(lastScheduled)) {
- Workflow clonedWf =
- TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
- timeToSchedule));
- TaskDriver driver = new TaskDriver(_manager);
- try {
- // Start the cloned workflow
- driver.start(clonedWf);
- } catch (Exception e) {
- LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
- }
- // Persist workflow start regardless of success to avoid retrying and failing
- workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
- TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
- }
-
- // Change the time to trigger the pipeline to that of the next run
- startTime = new Date(timeToSchedule + period);
- delayFromStart = startTime.getTime() - System.currentTimeMillis();
- } else {
- // This is a one-time workflow and is ready
- return true;
- }
- }
-
- scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
- return false;
- }
-
- private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
- // Do nothing if there is already a timer set for the this workflow with the same start time.
- if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
- || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
- LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
- return;
- }
- LOG.info(
- "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
- + " delay from start: " + delayFromStart);
-
- // For workflows not yet scheduled, schedule them and record it
- RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
- SCHEDULED_TIMES.put(id, startTime);
- SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
- }
-
- private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
- // Clear current entries if they exist and are expired
- long currentTime = now;
- Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
- if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
- LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
- SCHEDULED_TIMES.remove(jobResource);
- }
-
- // Figure out the earliest schedulable time in the future of a non-complete job
- boolean shouldSchedule = false;
- long earliestTime = Long.MAX_VALUE;
- for (int p : ctx.getPartitionSet()) {
- long retryTime = ctx.getNextRetryTime(p);
- TaskPartitionState state = ctx.getPartitionState(p);
- state = (state != null) ? state : TaskPartitionState.INIT;
- Set<TaskPartitionState> errorStates =
- Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
- TaskPartitionState.TIMED_OUT);
- if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
- earliestTime = retryTime;
- shouldSchedule = true;
- }
- }
-
- // If any was found, then schedule it
- if (shouldSchedule) {
- long delay = earliestTime - currentTime;
- Date startTime = new Date(earliestTime);
- scheduleRebalance(jobResource, jobResource, startTime, delay);
- }
- }
-
- /**
- * Checks if the job has completed.
- * @param ctx The rebalancer context.
- * @param allPartitions The set of partitions to check.
- * @param skippedPartitions partitions that failed, but whose failure is acceptable
- * @return true if all task partitions have been marked with status
- * {@link TaskPartitionState#COMPLETED} in the rebalancer
- * context, false otherwise.
- */
- private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
- Set<Integer> skippedPartitions, JobConfig cfg) {
- for (Integer pId : allPartitions) {
- TaskPartitionState state = ctx.getPartitionState(pId);
- if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
- && !isTaskGivenup(ctx, cfg, pId)) {
- return false;
- }
- }
- return true;
- }
+ @Override public abstract ResourceAssignment computeBestPossiblePartitionState(
+ ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+ CurrentStateOutput currStateOutput);
/**
* Checks if the workflow has completed.
+ *
* @param ctx Workflow context containing job states
* @param cfg Workflow config containing set of jobs
* @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
*/
- private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+ protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
if (!cfg.isTerminable()) {
return false;
}
@@ -705,149 +82,23 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
/**
* Checks if the workflow has been stopped.
+ *
* @param ctx Workflow context containing task states
* @param cfg Workflow config containing set of tasks
* @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
*/
- private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+ protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
for (String job : cfg.getJobDag().getAllNodes()) {
- if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
+ TaskState jobState = ctx.getJobState(job);
+ if (jobState != null && jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
+ && jobState != TaskState.STOPPED)
return false;
- }
}
return true;
}
- private static void markForDeletion(HelixManager mgr, String resourceName) {
- mgr.getConfigAccessor().set(
- TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
- WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
- }
-
- /**
- * Cleans up all Helix state associated with this job, wiping workflow-level information if this
- * is the last remaining job in its workflow, and the workflow is terminable.
- */
- private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
- String workflowResource) {
- LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
- HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-
- // Remove any DAG references in workflow
- PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
- for (String child : jobDag.getDirectChildren(resourceName)) {
- jobDag.getChildrenToParents().get(child).remove(resourceName);
- }
- for (String parent : jobDag.getDirectParents(resourceName)) {
- jobDag.getParentsToChildren().get(parent).remove(resourceName);
- }
- jobDag.getChildrenToParents().remove(resourceName);
- jobDag.getParentsToChildren().remove(resourceName);
- jobDag.getAllNodes().remove(resourceName);
- try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
- } catch (Exception e) {
- LOG.equals("Could not update DAG for job: " + resourceName);
- }
- return currentData;
- }
- };
- accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
- AccessOption.PERSISTENT);
-
- // Delete resource configs.
- PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
- if (!accessor.removeProperty(cfgKey)) {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- resourceName,
- cfgKey));
- }
-
- // Delete property store information for this resource.
- // For recurring workflow, it's OK if the node doesn't exist.
- String propStoreKey = getRebalancerPropStoreKey(resourceName);
- mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
-
- // Delete the ideal state itself.
- PropertyKey isKey = getISPropertyKey(accessor, resourceName);
- if (!accessor.removeProperty(isKey)) {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
- resourceName, isKey));
- }
-
- // Delete dead external view
- // because job is already completed, there is no more current state change
- // thus dead external views removal will not be triggered
- PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
- accessor.removeProperty(evKey);
-
- LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
-
- boolean lastInWorkflow = true;
- for (String job : cfg.getJobDag().getAllNodes()) {
- // check if property store information or resource configs exist for this job
- if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
- AccessOption.PERSISTENT)
- || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
- || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
- lastInWorkflow = false;
- break;
- }
- }
-
- // clean up workflow-level info if this was the last in workflow
- if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
- // delete workflow config
- PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
- if (!accessor.removeProperty(workflowCfgKey)) {
- throw new RuntimeException(
- String
- .format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- workflowResource, workflowCfgKey));
- }
- // Delete property store information for this workflow
- String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
- if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
- throw new RuntimeException(
- String
- .format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- workflowResource, workflowPropStoreKey));
- }
- // Remove pending timer for this workflow if exists
- if (SCHEDULED_TIMES.containsKey(workflowResource)) {
- SCHEDULED_TIMES.remove(workflowResource);
- }
- }
-
- }
-
- private static String getRebalancerPropStoreKey(String resource) {
- return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
- }
-
- private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
- return accessor.keyBuilder().idealStates(resource);
- }
-
- private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
- return accessor.keyBuilder().resourceConfig(resource);
- }
-
- private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
- for (Integer pId : toAdd) {
- destination.add(pId);
- }
- }
-
- private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
+ protected ResourceAssignment buildEmptyAssignment(String name,
+ CurrentStateOutput currStateOutput) {
ResourceAssignment assignment = new ResourceAssignment(name);
Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
for (Partition partition : partitions) {
@@ -861,164 +112,158 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return assignment;
}
- private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
- Iterable<Integer> pIds) {
- for (Integer pId : pIds) {
- TaskPartitionState state = ctx.getPartitionState(pId);
- if (state == TaskPartitionState.COMPLETED) {
- set.add(pId);
- }
- }
- }
-
- private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
- return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
- }
+ /**
+ * Check all the dependencies of a job to determine whether the job is ready to be scheduled.
+ *
+ * @param job
+ * @param workflowCfg
+ * @param workflowCtx
+ * @return
+ */
+ protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
+ WorkflowContext workflowCtx) {
+ int notStartedCount = 0;
+ int inCompleteCount = 0;
- // add all partitions that have been tried maxNumberAttempts
- private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
- JobConfig cfg) {
- for (Integer pId : pIds) {
- if (isTaskGivenup(ctx, cfg, pId)) {
- set.add(pId);
+ for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
+ TaskState jobState = workflowCtx.getJobState(ancestor);
+ if (jobState == null || jobState == TaskState.NOT_STARTED) {
+ ++notStartedCount;
+ } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+ ++inCompleteCount;
}
}
- }
-
- private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
- Set<Integer> excluded, int n) {
- List<Integer> result = new ArrayList<Integer>();
- for (Integer pId : candidatePartitions) {
- if (result.size() >= n) {
- break;
- }
- if (!excluded.contains(pId)) {
- result.add(pId);
- }
+ if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+ LOG.debug(String
+ .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.",
+ job, notStartedCount, inCompleteCount));
+ return false;
}
- return result;
- }
-
- private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
- long delayInterval = cfg.getTaskRetryDelay();
- if (delayInterval <= 0) {
- return;
- }
- long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
- ctx.setNextRetryTime(p, nextStartTime);
+ return true;
}
- private static void markPartitionCompleted(JobContext ctx, int pId) {
- ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
- ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- ctx.incrementNumAttempts(pId);
+ /**
+ * Check if a workflow is ready to schedule.
+ *
+ * @param workflowCfg the workflow to check
+ * @return true if the workflow is ready for schedule, false if not ready
+ */
+ protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
+ Date startTime = workflowCfg.getStartTime();
+ // Workflow with non-scheduled config or passed start time is ready to schedule.
+ return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
}
- private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
- boolean incrementAttempts) {
- ctx.setPartitionState(pId, state);
- ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- if (incrementAttempts) {
- ctx.incrementNumAttempts(pId);
- }
+ @Override public IdealState computeNewIdealState(String resourceName,
+ IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+ ClusterDataCache clusterData) {
+ // All of the heavy lifting is in the ResourceAssignment computation,
+ // so this part can just be a no-op.
+ return currentIdealState;
}
- private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
- boolean incrementAttempts) {
- for (int pId : ctx.getPartitionSet()) {
- markPartitionError(ctx, pId, state, incrementAttempts);
- }
- }
+ // Management of already-scheduled rebalances across all task entities.
+ protected static class ScheduledRebalancer {
+ private class ScheduledTask {
+ long _startTime;
+ Future _future;
- /**
- * Return the assignment of task partitions per instance.
- */
- private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
- Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
- Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList) {
- result.put(instance, new TreeSet<Integer>());
- }
+ public ScheduledTask(long _startTime, Future _future) {
+ this._startTime = _startTime;
+ this._future = _future;
+ }
- for (Partition partition : assignment.getMappedPartitions()) {
- int pId = pId(partition.getPartitionName());
- if (includeSet.contains(pId)) {
- Map<String, String> replicaMap = assignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet()) {
- SortedSet<Integer> pList = result.get(instance);
- if (pList != null) {
- pList.add(pId);
- }
- }
+ public long getStartTime() {
+ return _startTime;
}
- }
- return result;
- }
- private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
- Set<Integer> nonReadyPartitions = Sets.newHashSet();
- for (int p : ctx.getPartitionSet()) {
- long toStart = ctx.getNextRetryTime(p);
- if (now < toStart) {
- nonReadyPartitions.add(p);
+ public Future getFuture() {
+ return _future;
}
}
- return nonReadyPartitions;
- }
-
- /**
- * Computes the partition name given the resource name and partition id.
- */
- protected static String pName(String resource, int pId) {
- return resource + "_" + pId;
- }
- /**
- * Extracts the partition id from the given partition name.
- */
- protected static int pId(String pName) {
- String[] tokens = pName.split("_");
- return Integer.valueOf(tokens[tokens.length - 1]);
- }
-
- /**
- * An (instance, state) pair.
- */
- private static class PartitionAssignment {
- private final String _instance;
- private final String _state;
+ private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
+ private final ScheduledExecutorService _rebalanceExecutor =
+ Executors.newSingleThreadScheduledExecutor();
+
+ /**
+ * Add a future rebalance task for resource at given startTime
+ *
+ * @param resource
+ * @param startTime time in milliseconds
+ */
+ public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
+ // Do nothing if there is already a timer set for the this workflow with the same start time.
+ ScheduledTask existTask = _rebalanceTasks.get(resource);
+ if (existTask != null && existTask.getStartTime() == startTime) {
+ LOG.debug("Schedule timer for job: " + resource + " is up to date.");
+ return;
+ }
- private PartitionAssignment(String instance, String state) {
- _instance = instance;
- _state = state;
+ long delay = startTime - System.currentTimeMillis();
+ LOG.info("Schedule rebalance with job: " + resource + " at time: " + startTime + " delay: "
+ + delay);
+
+ // For workflow not yet scheduled, schedule them and record it
+ RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
+ ScheduledFuture future =
+ _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+ ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
+ if (prevTask != null && !prevTask.getFuture().isDone()) {
+ if (!prevTask.getFuture().cancel(false)) {
+ LOG.warn("Failed to cancel scheduled timer task for " + resource);
+ }
+ }
}
- }
- @Override
- public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- // All of the heavy lifting is in the ResourceAssignment computation,
- // so this part can just be a no-op.
- return currentIdealState;
- }
+ /**
+ * Get the current schedule time for given resource.
+ *
+ * @param resource
+ * @return existing schedule time or NULL if there is no scheduled task for this resource
+ */
+ public long getRebalanceTime(String resource) {
+ ScheduledTask task = _rebalanceTasks.get(resource);
+ if (task != null) {
+ return task.getStartTime();
+ }
+ return -1;
+ }
+
+ /**
+ * Remove all existing future schedule tasks for the given resource
+ *
+ * @param resource
+ */
+ public void removeScheduledRebalance(String resource) {
+ ScheduledTask existTask = _rebalanceTasks.remove(resource);
+ if (existTask != null && !existTask.getFuture().isDone()) {
+ if (!existTask.getFuture().cancel(true)) {
+ LOG.warn("Failed to cancel scheduled timer task for " + resource);
+ }
+ LOG.info(
+ "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
+ + resource);
+ }
+ }
- /**
- * The simplest possible runnable that will trigger a run of the controller pipeline
- */
- private static class RebalanceInvoker implements Runnable {
- private final HelixManager _manager;
- private final String _resource;
+ /**
+ * The simplest possible runnable that will trigger a run of the controller pipeline
+ */
+ private class RebalanceInvoker implements Runnable {
+ private final HelixManager _manager;
+ private final String _resource;
- public RebalanceInvoker(HelixManager manager, String resource) {
- _manager = manager;
- _resource = resource;
- }
+ public RebalanceInvoker(HelixManager manager, String resource) {
+ _manager = manager;
+ _resource = resource;
+ }
- @Override
- public void run() {
- TaskUtil.invokeRebalance(_manager, _resource);
+ @Override public void run() {
+ TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index bb62de5..d804fab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
@@ -56,14 +57,13 @@ import com.google.common.collect.Maps;
public class TaskUtil {
private static final Logger LOG = Logger.getLogger(TaskUtil.class);
public static final String CONTEXT_NODE = "Context";
- public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
/**
* Parses job resource configurations in Helix into a {@link JobConfig} object.
- * @param accessor Accessor to access Helix configs
+ *
+ * @param accessor Accessor to access Helix configs
* @param jobResource The name of the job resource
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
- * otherwise.
+ * otherwise.
*/
public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
@@ -85,10 +85,11 @@ public class TaskUtil {
/**
* Parses job resource configurations in Helix into a {@link JobConfig} object.
- * @param manager HelixManager object used to connect to Helix.
+ *
+ * @param manager HelixManager object used to connect to Helix.
* @param jobResource The name of the job resource.
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
- * otherwise.
+ * otherwise.
*/
public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
return getJobCfg(manager.getHelixDataAccessor(), jobResource);
@@ -96,12 +97,13 @@ public class TaskUtil {
/**
* Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
- * @param cfgAccessor Config accessor to access Helix configs
- * @param accessor Accessor to access Helix configs
- * @param clusterName Cluster name
+ *
+ * @param cfgAccessor Config accessor to access Helix configs
+ * @param accessor Accessor to access Helix configs
+ * @param clusterName Cluster name
* @param workflowResource The name of the workflow resource.
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
- * workflow, null otherwise.
+ * workflow, null otherwise.
*/
public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor,
HelixDataAccessor accessor, String clusterName, String workflowResource) {
@@ -117,10 +119,11 @@ public class TaskUtil {
/**
* Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
- * @param manager Helix manager object used to connect to Helix.
+ *
+ * @param manager Helix manager object used to connect to Helix.
* @param workflowResource The name of the workflow resource.
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
- * workflow, null otherwise.
+ * workflow, null otherwise.
*/
public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(),
@@ -129,18 +132,19 @@ public class TaskUtil {
/**
* Request a state change for a specific task.
- * @param accessor connected Helix data accessor
- * @param instance the instance serving the task
+ *
+ * @param accessor connected Helix data accessor
+ * @param instance the instance serving the task
* @param sessionId the current session of the instance
- * @param resource the job name
+ * @param resource the job name
* @param partition the task partition name
- * @param state the requested state
+ * @param state the requested state
* @return true if the request was persisted, false otherwise
*/
public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
String sessionId, String resource, String partition, TaskPartitionState state) {
- LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
- partition));
+ LOG.debug(
+ String.format("Requesting a state transition to %s for partition %s.", state, partition));
try {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
@@ -149,16 +153,18 @@ public class TaskUtil {
return accessor.updateProperty(key, currStateDelta);
} catch (Exception e) {
- LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
- state, partition), e);
+ LOG.error(String
+ .format("Error when requesting a state transition to %s for partition %s.", state,
+ partition), e);
return false;
}
}
/**
* Get a Helix configuration scope at a resource (i.e. job and workflow) level
+ *
* @param clusterName the cluster containing the resource
- * @param resource the resource name
+ * @param resource the resource name
* @return instantiated {@link HelixConfigScope}
*/
public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
@@ -167,51 +173,24 @@ public class TaskUtil {
}
/**
- * Get the last task assignment for a given job
- * @param manager a connection to Helix
- * @param resourceName the name of the job
- * @return {@link ResourceAssignment} instance, or null if no assignment is available
- */
- public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
- String resourceName) {
- ZNRecord r =
- manager.getHelixPropertyStore().get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
- null, AccessOption.PERSISTENT);
- return r != null ? new ResourceAssignment(r) : null;
- }
-
- /**
- * Set the last task assignment for a given job
- * @param manager a connection to Helix
- * @param resourceName the name of the job
- * @param ra {@link ResourceAssignment} containing the task assignment
- */
- public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
- ResourceAssignment ra) {
- manager.getHelixPropertyStore().set(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
- ra.getRecord(), AccessOption.PERSISTENT);
- }
-
- /**
* Get the runtime context of a single job
+ *
* @param propertyStore Property store for the cluster
- * @param jobResource The name of the job
+ * @param jobResource The name of the job
* @return the {@link JobContext}, or null if none is available
*/
public static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore,
String jobResource) {
- ZNRecord r =
- propertyStore.get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+ ZNRecord r = propertyStore
+ .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
null, AccessOption.PERSISTENT);
return r != null ? new JobContext(r) : null;
}
/**
* Get the runtime context of a single job
- * @param manager a connection to Helix
+ *
+ * @param manager a connection to Helix
* @param jobResource the name of the job
* @return the {@link JobContext}, or null if none is available
*/
@@ -221,34 +200,36 @@ public class TaskUtil {
/**
* Set the runtime context of a single job
- * @param manager a connection to Helix
+ *
+ * @param manager a connection to Helix
* @param jobResource the name of the job
- * @param ctx the up-to-date {@link JobContext} for the job
+ * @param ctx the up-to-date {@link JobContext} for the job
*/
public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
- manager.getHelixPropertyStore().set(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
- ctx.getRecord(), AccessOption.PERSISTENT);
+ manager.getHelixPropertyStore()
+ .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+ ctx.getRecord(), AccessOption.PERSISTENT);
}
/**
* Get the runtime context of a single workflow
- * @param propertyStore Property store of the cluster
+ *
+ * @param propertyStore Property store of the cluster
* @param workflowResource The name of the workflow
* @return the {@link WorkflowContext}, or null if none is available
*/
public static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
String workflowResource) {
- ZNRecord r =
- propertyStore.get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
- CONTEXT_NODE), null, AccessOption.PERSISTENT);
+ ZNRecord r = propertyStore.get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
+ null, AccessOption.PERSISTENT);
return r != null ? new WorkflowContext(r) : null;
}
/**
* Get the runtime context of a single workflow
- * @param manager a connection to Helix
+ *
+ * @param manager a connection to Helix
* @param workflowResource the name of the workflow
* @return the {@link WorkflowContext}, or null if none is available
*/
@@ -258,9 +239,10 @@ public class TaskUtil {
/**
* Set the runtime context of a single workflow
- * @param manager a connection to Helix
+ *
+ * @param manager a connection to Helix
* @param workflowResource the name of the workflow
- * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+ * @param ctx the up-to-date {@link WorkflowContext} for the workflow
*/
public static void setWorkflowContext(HelixManager manager, String workflowResource,
WorkflowContext ctx) {
@@ -271,6 +253,7 @@ public class TaskUtil {
/**
* Get a workflow-qualified job name for a single-job workflow
+ *
* @param singleJobWorkflow the name of the single-job workflow
* @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
*/
@@ -280,8 +263,9 @@ public class TaskUtil {
/**
* Get a workflow-qualified job name for a job in that workflow
+ *
* @param workflowResource the name of the workflow
- * @param jobName the un-namespaced name of the job
+ * @param jobName the un-namespaced name of the job
* @return The namespaced job name, which is just workflowResource_jobName
*/
public static String getNamespacedJobName(String workflowResource, String jobName) {
@@ -290,8 +274,9 @@ public class TaskUtil {
/**
* Remove the workflow namespace from the job name
+ *
* @param workflowResource the name of the workflow that owns the job
- * @param jobName the namespaced job name
+ * @param jobName the namespaced job name
* @return the denamespaced job name, or the same job name if it is already denamespaced
*/
public static String getDenamespacedJobName(String workflowResource, String jobName) {
@@ -305,6 +290,7 @@ public class TaskUtil {
/**
* Serialize a map of job-level configurations as a single string
+ *
* @param commandConfig map of job config key to config value
* @return serialized string
*/
@@ -321,6 +307,7 @@ public class TaskUtil {
/**
* Deserialize a single string into a map of job-level configurations
+ *
* @param commandConfig the serialized job config map
* @return a map of job config key to config value
*/
@@ -339,22 +326,27 @@ public class TaskUtil {
/**
* Trigger a controller pipeline execution for a given resource.
- * @param manager Helix connection
+ *
+ * @param accessor Helix data accessor
* @param resource the name of the resource changed to triggering the execution
*/
- public static void invokeRebalance(HelixManager manager, String resource) {
+ public static void invokeRebalance(HelixDataAccessor accessor, String resource) {
// The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ LOG.info("invoke rebalance for " + resource);
PropertyKey key = accessor.keyBuilder().idealStates(resource);
IdealState is = accessor.getProperty(key);
- if (is != null) {
- accessor.updateProperty(key, is);
- LOG.debug("invoke rebalance for " + key);
+ if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+ if (!accessor.updateProperty(key, is)) {
+ LOG.warn("Failed to invoke rebalance on resource " + resource);
+ }
+ } else {
+ LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
}
}
/**
* Get a ScheduleConfig from a workflow config string map
+ *
* @param cfg the string map
* @return a ScheduleConfig if one exists, otherwise null
*/
@@ -369,11 +361,11 @@ public class TaskUtil {
return null;
}
}
- if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT)
- && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
- return ScheduleConfig.recurringFromDate(startTime,
- TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
- Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
+ if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) && cfg
+ .containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
+ return ScheduleConfig
+ .recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
+ Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
} else if (startTime != null) {
return ScheduleConfig.oneTimeDelayedStart(startTime);
}
@@ -382,10 +374,11 @@ public class TaskUtil {
/**
* Create a new workflow based on an existing one
- * @param manager connection to Helix
+ *
+ * @param manager connection to Helix
* @param origWorkflowName the name of the existing workflow
- * @param newWorkflowName the name of the new workflow
- * @param newStartTime a provided start time that deviates from the desired start time
+ * @param newWorkflowName the name of the new workflow
+ * @param newStartTime a provided start time that deviates from the desired start time
* @return the cloned workflow, or null if there was a problem cloning the existing one
*/
public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -474,4 +467,61 @@ public class TaskUtil {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.resourceConfig(resource));
}
+
+ /**
+ * Cleans up IdealState and external view associated with a job/workflow resource.
+ */
+ public static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
+ LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
+
+ // Delete the ideal state itself.
+ PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
+ if (accessor.getProperty(isKey) != null) {
+ if (!accessor.removeProperty(isKey)) {
+ LOG.error(String.format(
+ "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+ resourceName, isKey));
+ }
+ } else {
+ LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
+ }
+
+ // Delete dead external view
+ // because job is already completed, there is no more current state change
+ // thus dead external views removal will not be triggered
+ PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
+ if (accessor.getProperty(evKey) != null) {
+ if (!accessor.removeProperty(evKey)) {
+ LOG.error(String.format(
+ "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+ resourceName, evKey));
+ }
+ }
+
+ LOG.info(String
+ .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
+ }
+
+ /**
+ * Extracts the partition id from the given partition name.
+ *
+ * @param pName
+ * @return
+ */
+ public static int getPartitionId(String pName) {
+ int index = pName.lastIndexOf("_");
+ if (index == -1) {
+ throw new HelixException("Invalid partition name " + pName);
+ }
+ return Integer.valueOf(pName.substring(index + 1));
+ }
+
+ public static String getWorkflowContextKey(String resource) {
+ // TODO: fix this to use the keyBuilder.
+ return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+ }
+
+ public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String resource) {
+ return accessor.keyBuilder().resourceConfig(resource);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 259b72c..8ea2691 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -230,7 +230,7 @@ public class Workflow {
protected Map<String, Map<String, String>> _jobConfigs;
protected Map<String, List<TaskConfig>> _taskConfigs;
protected ScheduleConfig _scheduleConfig;
- protected long _expiry;
+ protected long _expiry = -1;
protected Map<String, String> _cfgMap;
protected int _parallelJobs = -1;
@@ -239,7 +239,7 @@ public class Workflow {
_dag = new JobDag();
_jobConfigs = new TreeMap<String, Map<String, String>>();
_taskConfigs = new TreeMap<String, List<TaskConfig>>();
- _expiry = WorkflowConfig.DEFAULT_EXPIRY;
+ _expiry = -1;
}
public Builder addConfig(String job, String key, String val) {
@@ -340,7 +340,7 @@ public class Workflow {
if (_expiry > 0) {
builder.setExpiry(_expiry);
}
- if (_parallelJobs != -1) {
+ if (_parallelJobs > 0) {
builder.setParallelJobs(_parallelJobs);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
new file mode 100644
index 0000000..912f501
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -0,0 +1,412 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.*;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.*;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.log4j.Logger;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Custom rebalancer implementation for the {@code Workflow} in task state model.
+ */
+public class WorkflowRebalancer extends TaskRebalancer {
+ private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
+
+ @Override
+ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+ IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+ final String workflow = resource.getResourceName();
+ LOG.debug("Computer Best Partition for workflow: " + workflow);
+
+ // Fetch workflow configuration and context
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+ if (workflowCfg == null) {
+ LOG.warn("Workflow configuration is NULL for " + workflow);
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflow);
+ // Initialize workflow context if needed
+ if (workflowCtx == null) {
+ workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+ workflowCtx.setStartTime(System.currentTimeMillis());
+ LOG.debug("Workflow context is created for " + workflow);
+ }
+
+ // Clean up if workflow marked for deletion
+ TargetState targetState = workflowCfg.getTargetState();
+ if (targetState == TargetState.DELETE) {
+ LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+ cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ if (targetState == TargetState.STOP) {
+ LOG.info("Workflow " + workflow + "is marked as stopped.");
+ // Workflow has been stopped if all jobs are stopped
+ // TODO: what should we do if workflowCtx is not set yet?
+ if (workflowCtx != null && isWorkflowStopped(workflowCtx, workflowCfg)) {
+ workflowCtx.setWorkflowState(TaskState.STOPPED);
+ }
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ long currentTime = System.currentTimeMillis();
+ // Check if workflow is completed and mark it if it is completed.
+ if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ if (isWorkflowComplete(workflowCtx, workflowCfg)) {
+ workflowCtx.setWorkflowState(TaskState.COMPLETED);
+ workflowCtx.setFinishTime(currentTime);
+ TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+ }
+ }
+
+ if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+ LOG.info("Workflow " + workflow + " is completed.");
+ long expiryTime = workflowCfg.getExpiry();
+ // Check if this workflow has been finished past its expiry.
+ if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
+ LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
+ cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+ } else {
+ // schedule future cleanup work
+ long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
+ _scheduledRebalancer.scheduleRebalance(_manager, workflow, cleanupTime);
+ }
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ if (!isWorkflowReadyForSchedule(workflowCfg)) {
+ LOG.info("Workflow " + workflow + " is not ready to schedule");
+ // set the timer to trigger future schedule
+ _scheduledRebalancer
+ .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime());
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ // Check for readiness, and stop processing if it's not ready
+ boolean isReady =
+ scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+ if (isReady) {
+ // Schedule jobs from this workflow.
+ scheduleJobs(workflowCfg, workflowCtx);
+ } else {
+ LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+ }
+
+ TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+ return buildEmptyAssignment(workflow, currStateOutput);
+ }
+
+ /**
+ * Figure out whether the jobs in the workflow should be run,
+ * and if it's ready, then just schedule it
+ */
+ private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+ ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+ if (scheduleConfig != null && scheduleConfig.isRecurring()) {
+ LOG.debug("Jobs from recurring workflow are not schedule-able");
+ return;
+ }
+
+ for (String job : workflowCfg.getJobDag().getAllNodes()) {
+ TaskState jobState = workflowCtx.getJobState(job);
+ if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
+ LOG.debug("Job " + job + " is already started or completed.");
+ continue;
+ }
+ // check ancestor job status
+ if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
+ JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ scheduleSingleJob(job, jobConfig);
+ }
+ }
+ }
+
+ /**
+ * Posts new job to cluster
+ */
+ private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
+ HelixAdmin admin = _manager.getClusterManagmentTool();
+
+ IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
+ if (jobIS != null) {
+ LOG.info("Job " + jobResource + " idealstate already exists!");
+ return;
+ }
+
+ // Set up job resource based on partitions from target resource
+ int numIndependentTasks = jobConfig.getTaskConfigMap().size();
+ int numPartitions = (numIndependentTasks > 0) ?
+ numIndependentTasks :
+ admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource())
+ .getPartitionSet().size();
+ admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
+ TaskConstants.STATE_MODEL_NAME);
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ // Set the job configuration
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ HelixProperty resourceConfig = new HelixProperty(jobResource);
+ resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+ Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+ if (taskConfigMap != null) {
+ for (TaskConfig taskConfig : taskConfigMap.values()) {
+ resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ }
+ }
+ accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
+ // Push out new ideal state based on number of target partitions
+ CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
+ builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
+ builder.setNumReplica(1);
+ builder.setNumPartitions(numPartitions);
+ builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+
+ if (jobConfig.isDisableExternalView()) {
+ builder.setDisableExternalView(true);
+ }
+
+ jobIS = builder.build();
+ for (int i = 0; i < numPartitions; i++) {
+ jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
+ jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
+ }
+ jobIS.setRebalancerClassName(JobRebalancer.class.getName());
+ admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
+ }
+
+ /**
+ * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+ *
+ * @param workflow the Helix resource associated with the workflow
+ * @param workflowCfg the workflow to check
+ * @param workflowCtx the current workflow context
+ * @return true if the workflow is ready for schedule, false if not ready
+ */
+ private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
+ WorkflowContext workflowCtx) {
+ // non-scheduled workflow is ready to run immediately.
+ if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+ return true;
+ }
+
+ // Figure out when this should be run, and if it's ready, then just run it
+ ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+ Date startTime = scheduleConfig.getStartTime();
+ long currentTime = new Date().getTime();
+ long delayFromStart = startTime.getTime() - currentTime;
+
+ if (delayFromStart <= 0) {
+ // Recurring workflows are just templates that spawn new workflows
+ if (scheduleConfig.isRecurring()) {
+ // Skip scheduling this workflow if it's not in a start state
+ if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+ LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
+ return false;
+ }
+
+ // Skip scheduling this workflow again if the previous run (if any) is still active
+ String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+ if (lastScheduled != null) {
+ WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
+ if (lastWorkflowCtx != null
+ && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+ return false;
+ }
+ }
+
+ // Figure out how many jumps are needed, thus the time to schedule the next workflow
+ // The negative of the delay is the amount of time past the start time
+ long period =
+ scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+ long offsetMultiplier = (-delayFromStart) / period;
+ long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+ // Now clone the workflow if this clone has not yet been created
+ DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
+ LOG.debug("Ready to start workflow " + newWorkflowName);
+ if (!newWorkflowName.equals(lastScheduled)) {
+ Workflow clonedWf = TaskUtil
+ .cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
+ TaskDriver driver = new TaskDriver(_manager);
+ try {
+ // Start the cloned workflow
+ driver.start(clonedWf);
+ } catch (Exception e) {
+ LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+ }
+ // Persist workflow start regardless of success to avoid retrying and failing
+ workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+ TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+ }
+
+ // Change the time to trigger the pipeline to that of the next run
+ _scheduledRebalancer.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
+ } else {
+ // one time workflow.
+ // Remove any timers that are past-time for this workflowg
+ long scheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
+ if (scheduledTime > 0 && currentTime > scheduledTime) {
+ _scheduledRebalancer.removeScheduledRebalance(workflow);
+ }
+ return true;
+ }
+ } else {
+ // set the timer to trigger future schedule
+ _scheduledRebalancer.scheduleRebalance(_manager, workflow, startTime.getTime());
+ }
+
+ return false;
+ }
+
+ /**
+ * Cleans up workflow configs and workflow contexts associated with this workflow,
+ * including all job-level configs and context, plus workflow-level information.
+ */
+ private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
+ WorkflowContext workflowCtx) {
+ LOG.info("Cleaning up workflow: " + workflow);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ /*
+ if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ LOG.error("Workflow " + workflow + " has not completed, abort the clean up task.");
+ return;
+ }*/
+
+ for (String job : workflowcfg.getJobDag().getAllNodes()) {
+ cleanupJob(job, workflow);
+ }
+
+ // clean up workflow-level info if this was the last in workflow
+ if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
+ // clean up IS & EV
+ TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
+
+ // delete workflow config
+ PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
+ if (accessor.getProperty(workflowCfgKey) != null) {
+ if (!accessor.removeProperty(workflowCfgKey)) {
+ LOG.error(String.format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.",
+ workflow, workflowCfgKey));
+ }
+ }
+ // Delete workflow context
+ String workflowPropStoreKey = TaskUtil.getWorkflowContextKey(workflow);
+ LOG.info("Removing workflow context: " + workflowPropStoreKey);
+ if (!_manager.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+ LOG.error(String.format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ workflow, workflowPropStoreKey));
+ }
+
+ // Remove pending timer task for this workflow if exists
+ _scheduledRebalancer.removeScheduledRebalance(workflow);
+ }
+ }
+
+
+ /**
+ * Cleans up workflow configs and workflow contexts associated with this workflow,
+ * including all job-level configs and context, plus workflow-level information.
+ */
+ private void cleanupJob(final String job, String workflow) {
+ LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ // Remove any idealstate and externalView.
+ TaskUtil.cleanupIdealStateExtView(accessor, job);
+
+ // Remove DAG references in workflow
+ PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
+ DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ for (String child : jobDag.getDirectChildren(job)) {
+ jobDag.getChildrenToParents().get(child).remove(job);
+ }
+ for (String parent : jobDag.getDirectParents(job)) {
+ jobDag.getParentsToChildren().get(parent).remove(job);
+ }
+ jobDag.getChildrenToParents().remove(job);
+ jobDag.getParentsToChildren().remove(job);
+ jobDag.getAllNodes().remove(job);
+ try {
+ currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ } catch (Exception e) {
+ LOG.error("Could not update DAG for job: " + job, e);
+ }
+ } else {
+ LOG.error("Could not update DAG for job: " + job + " ZNRecord is null.");
+ }
+ return currentData;
+ }
+ };
+ accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
+ AccessOption.PERSISTENT);
+
+ // Delete job configs.
+ PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
+ if (accessor.getProperty(cfgKey) != null) {
+ if (!accessor.removeProperty(cfgKey)) {
+ LOG.error(String.format(
+ "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
+ job, cfgKey));
+ }
+ }
+
+ // Delete job context
+ // For recurring workflow, it's OK if the node doesn't exist.
+ String propStoreKey = TaskUtil.getWorkflowContextKey(job);
+ if (!_manager.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
+ job, propStoreKey));
+ }
+
+ LOG.info(String.format("Successfully cleaned up job context %s.", job));
+
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+
+ @Override
+ public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ // Nothing to do here with workflow resource.
+ return currentIdealState;
+ }
+}