You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/20 22:07:20 UTC
[2/3] [HELIX-353] Write an independent task rebalancer
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 06e6e4f..ada2f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1,8 +1,27 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.io.File;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -22,10 +41,13 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.log4j.Logger;
+import com.beust.jcommander.internal.Lists;
+
/**
* CLI for scheduling/canceling workflows
*/
@@ -132,56 +154,77 @@ public class TaskDriver {
flow.getResourceConfigMap());
// then schedule tasks
- for (String task : flow.getTaskConfigs().keySet()) {
- scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+ for (String job : flow.getJobConfigs().keySet()) {
+ JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
+ if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
+ builder.addTaskConfigs(flow.getTaskConfigs().get(job));
+ }
+ scheduleJob(job, builder.build());
}
}
- /** Posts new task to cluster */
- private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception {
- // Set up task resource based on partitions from target resource
+ /** Posts new job to cluster */
+ private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
+ // Set up job resource based on partitions from target resource
+ int numIndependentTasks = jobConfig.getTaskConfigMap().size();
int numPartitions =
- _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
- .getPartitionSet().size();
- _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
- _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource),
- taskConfig.getResourceConfigMap());
+ (numIndependentTasks > 0) ? numIndependentTasks : _admin
+ .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet()
+ .size();
+ _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+
+ // Set the job configuration
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ HelixProperty resourceConfig = new HelixProperty(jobResource);
+ resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+ Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+ if (taskConfigMap != null) {
+ for (TaskConfig taskConfig : taskConfigMap.values()) {
+ resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ }
+ }
+ accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
// Push out new ideal state based on number of target partitions
- CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+ CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
builder.setNumReplica(1);
builder.setNumPartitions(numPartitions);
builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
for (int i = 0; i < numPartitions; i++) {
- builder.add(taskResource + "_" + i);
+ builder.add(jobResource + "_" + i);
}
IdealState is = builder.build();
- is.setRebalancerClassName(TaskRebalancer.class.getName());
- _admin.setResourceIdealState(_clusterName, taskResource, is);
+ if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
+ is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
+ } else {
+ is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName());
+ }
+ _admin.setResourceIdealState(_clusterName, jobResource, is);
}
- /** Public method to resume a task/workflow */
+ /** Public method to resume a job/workflow */
public void resume(String resource) {
setTaskTargetState(resource, TargetState.START);
}
- /** Public method to stop a task/workflow */
+ /** Public method to stop a job/workflow */
public void stop(String resource) {
setTaskTargetState(resource, TargetState.STOP);
}
- /** Public method to delete a task/workflow */
+ /** Public method to delete a job/workflow */
public void delete(String resource) {
setTaskTargetState(resource, TargetState.DELETE);
}
/** Helper function to change target state for a given task */
- private void setTaskTargetState(String taskResource, TargetState state) {
+ private void setTaskTargetState(String jobResource, TargetState state) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- HelixProperty p = new HelixProperty(taskResource);
+ HelixProperty p = new HelixProperty(jobResource);
p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
- accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+ accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p);
invokeRebalance();
}
@@ -191,34 +234,24 @@ public class TaskDriver {
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
LOG.info("Workflow " + resource + " consists of the following tasks: "
- + wCfg.getTaskDag().getAllNodes());
+ + wCfg.getJobDag().getAllNodes());
LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
- LOG.info("Task states are: ");
+ LOG.info("Job states are: ");
LOG.info("-------");
- for (String task : wCfg.getTaskDag().getAllNodes()) {
- LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+ for (String job : wCfg.getJobDag().getAllNodes()) {
+ LOG.info("Task " + job + " is " + wCtx.getJobState(job));
// fetch task information
- TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
- TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+ JobContext jCtx = TaskUtil.getJobContext(_manager, job);
// calculate taskPartitions
- List<Integer> partitions;
- if (tCfg.getTargetPartitions() != null) {
- partitions = tCfg.getTargetPartitions();
- } else {
- partitions = new ArrayList<Integer>();
- for (String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource())
- .getPartitionSet()) {
- partitions
- .add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
- }
- }
+ List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
+ Collections.sort(partitions);
// group partitions by status
Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
for (Integer i : partitions) {
- TaskPartitionState s = tCtx.getPartitionState(i);
+ TaskPartitionState s = jCtx.getPartitionState(i);
if (!statusCount.containsKey(s)) {
statusCount.put(s, 0);
}
@@ -257,23 +290,24 @@ public class TaskDriver {
return options;
}
- /** Constructs option group containing options required by all drivable tasks */
+ /** Constructs option group containing options required by all drivable jobs */
+ @SuppressWarnings("static-access")
private static OptionGroup contructGenericRequiredOptionGroup() {
Option zkAddressOption =
OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
- .withDescription("ZK address managing target cluster").create();
+ .withDescription("ZK address managing cluster").create();
zkAddressOption.setArgs(1);
zkAddressOption.setArgName("zkAddress");
Option clusterNameOption =
- OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION)
- .withDescription("Target cluster name").create();
+ OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
+ .create();
clusterNameOption.setArgs(1);
clusterNameOption.setArgName("clusterName");
Option taskResourceOption =
OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
- .withDescription("Target workflow or task").create();
+ .withDescription("Workflow or job name").create();
taskResourceOption.setArgs(1);
taskResourceOption.setArgName("resourceName");
@@ -284,8 +318,9 @@ public class TaskDriver {
return group;
}
- /** Constructs option group containing options required by all drivable tasks */
+ /** Constructs option group containing options required by all drivable jobs */
private static OptionGroup constructStartOptionGroup() {
+ @SuppressWarnings("static-access")
Option workflowFileOption =
OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
.withDescription("Local file describing workflow").create();
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
index 5133b74..31fddc7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -1,7 +1,24 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
+
/**
* A factory for {@link Task} objects.
@@ -9,8 +26,8 @@ package org.apache.helix.task;
public interface TaskFactory {
/**
* Returns a {@link Task} instance.
- * @param config Configuration information for the task.
+ * @param context Contextual information for the task, including task and job configurations
* @return A {@link Task} instance.
*/
- Task createNewTask(String config);
+ Task createNewTask(TaskCallbackContext context);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
index f3e182d..d41668d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
/**
* Enumeration of the states in the "Task" state model.
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 55eca7c..457f0e0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,10 +51,41 @@ import com.google.common.collect.Sets;
/**
* Custom rebalancer implementation for the {@code Task} state model.
*/
-public class TaskRebalancer implements Rebalancer, MappingCalculator {
+public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
private HelixManager _manager;
+ /**
+ * Get all the partitions that should be created by this task
+ * @param jobCfg the task configuration
+ * @param jobCtx the task context
+ * @param workflowCfg the workflow configuration
+ * @param workflowCtx the workflow context
+ * @param cache cluster snapshot
+ * @return set of partition numbers
+ */
+ public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+
+ /**
+ * Compute an assignment of tasks to instances
+ * @param currStateOutput the current state of the instances
+ * @param prevAssignment the previous task partition assignment
+ * @param instanceList the instances
+ * @param jobCfg the task configuration
+ * @param taskCtx the task context
+ * @param workflowCfg the workflow configuration
+ * @param workflowCtx the workflow context
+ * @param partitionSet the partitions to assign
+ * @param cache cluster snapshot
+ * @return map of instances to set of partition numbers
+ */
+ public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+ CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+ Iterable<String> instanceList, JobConfig jobCfg, JobContext jobContext,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+ ClusterDataCache cache);
+
@Override
public void init(HelixManager manager) {
_manager = manager;
@@ -49,9 +96,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
final String resourceName = resource.getResourceName();
- // Fetch task configuration
- TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
- String workflowResource = taskCfg.getWorkflow();
+ // Fetch job configuration
+ JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+ String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
@@ -64,9 +111,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// Check parent dependencies
- for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
- if (workflowCtx.getTaskState(parent) == null
- || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
+ for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
+ if (workflowCtx.getJobState(parent) == null
+ || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
return emptyAssignment(resourceName);
}
}
@@ -87,15 +134,15 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// Fetch any existing context information from the property store.
- TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
- if (taskCtx == null) {
- taskCtx = new TaskContext(new ZNRecord("TaskContext"));
- taskCtx.setStartTime(System.currentTimeMillis());
+ JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
+ if (jobCtx == null) {
+ jobCtx = new JobContext(new ZNRecord("TaskContext"));
+ jobCtx.setStartTime(System.currentTimeMillis());
}
- // The task is already in a final state (completed/failed).
- if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
- || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
+ // The job is already in a final state (completed/failed).
+ if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
+ || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
return emptyAssignment(resourceName);
}
@@ -111,9 +158,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
Set<Integer> partitionsToDrop = new TreeSet<Integer>();
ResourceAssignment newAssignment =
- computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment,
- clusterData.getIdealState(taskCfg.getTargetResource()), clusterData.getLiveInstances()
- .keySet(), currStateOutput, workflowCtx, taskCtx, partitionsToDrop);
+ computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
+ .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
+ clusterData);
if (!partitionsToDrop.isEmpty()) {
for (Integer pId : partitionsToDrop) {
@@ -125,40 +172,42 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// Update rebalancer context, previous ideal state.
- TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+ TaskUtil.setJobContext(_manager, resourceName, jobCtx);
TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
return newAssignment;
}
- private static ResourceAssignment computeResourceMapping(String taskResource,
- WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
- IdealState tgtResourceIs, Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
- WorkflowContext workflowCtx, TaskContext taskCtx, Set<Integer> partitionsToDropFromIs) {
- TargetState taskTgtState = workflowConfig.getTargetState();
+ private ResourceAssignment computeResourceMapping(String jobResource,
+ WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+ Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
+ WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+ ClusterDataCache cache) {
+ TargetState jobTgtState = workflowConfig.getTargetState();
// Update running status in workflow context
- if (taskTgtState == TargetState.STOP) {
- workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
- // Workflow has been stopped if all tasks are stopped
+ if (jobTgtState == TargetState.STOP) {
+ workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+ // Workflow has been stopped if all jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
}
} else {
- workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+ workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
- // Used to keep track of task partitions that have already been assigned to instances.
+ // Used to keep track of tasks that have already been assigned to instances.
Set<Integer> assignedPartitions = new HashSet<Integer>();
// Keeps a mapping of (partition) -> (instance, state)
Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
- // Process all the current assignments of task partitions.
- Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
+ // Process all the current assignments of tasks.
+ Set<Integer> allPartitions =
+ getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
Map<String, SortedSet<Integer>> taskAssignments =
getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
for (String instance : taskAssignments.keySet()) {
@@ -167,11 +216,11 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
// TASK_ERROR, ERROR.
Set<Integer> donePartitions = new TreeSet<Integer>();
for (int pId : pSet) {
- final String pName = pName(taskResource, pId);
+ final String pName = pName(jobResource, pId);
// Check for pending state transitions on this (partition, instance).
String pendingState =
- currStateOutput.getPendingState(taskResource, new Partition(pName), instance);
+ currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
if (pendingState != null) {
// There is a pending state transition for this (partition, instance). Just copy forward
// the state
@@ -191,12 +240,12 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
TaskPartitionState currState =
- TaskPartitionState.valueOf(currStateOutput.getCurrentState(taskResource, new Partition(
+ TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
pName), instance));
// Process any requested state transitions.
String requestedStateStr =
- currStateOutput.getRequestedState(taskResource, new Partition(pName), instance);
+ currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
if (requestedState.equals(currState)) {
@@ -217,7 +266,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
case RUNNING:
case STOPPED: {
TaskPartitionState nextState;
- if (taskTgtState == TargetState.START) {
+ if (jobTgtState == TargetState.START) {
nextState = TaskPartitionState.RUNNING;
} else {
nextState = TaskPartitionState.STOPPED;
@@ -237,7 +286,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
"Task partition %s has completed with state %s. Marking as such in rebalancer context.",
pName, currState));
partitionsToDropFromIs.add(pId);
- markPartitionCompleted(taskCtx, pId);
+ markPartitionCompleted(jobCtx, pId);
}
break;
case TIMED_OUT:
@@ -247,15 +296,15 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
LOG.debug(String.format(
"Task partition %s has error state %s. Marking as such in rebalancer context.",
pName, currState));
- markPartitionError(taskCtx, pId, currState);
+ markPartitionError(jobCtx, pId, currState);
// The error policy is to fail the task as soon a single partition fails for a specified
// maximum number of
// attempts.
- if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
- workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+ if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+ workflowCtx.setJobState(jobResource, TaskState.FAILED);
workflowCtx.setWorkflowState(TaskState.FAILED);
- addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
- return emptyAssignment(taskResource);
+ addAllPartitions(allPartitions, partitionsToDropFromIs);
+ return emptyAssignment(jobResource);
}
}
break;
@@ -277,8 +326,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
pSet.removeAll(donePartitions);
}
- if (isTaskComplete(taskCtx, allPartitions)) {
- workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+ if (isJobComplete(jobCtx, allPartitions)) {
+ workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.COMPLETED);
workflowCtx.setFinishTime(System.currentTimeMillis());
@@ -286,26 +335,29 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// Make additional task assignments if needed.
- if (taskTgtState == TargetState.START) {
+ if (jobTgtState == TargetState.START) {
// Contains the set of task partitions that must be excluded from consideration when making
// any new assignments.
// This includes all completed, failed, already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
- addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+ addCompletedPartitions(excludeSet, jobCtx, allPartitions);
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments =
- getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
- taskCfg.getTargetPartitionStates(), allPartitions);
+ getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+ workflowConfig, workflowCtx, allPartitions, cache);
for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
String instance = entry.getKey();
+ if (!tgtPartitionAssignments.containsKey(instance)) {
+ continue;
+ }
// Contains the set of task partitions currently assigned to the instance.
Set<Integer> pSet = entry.getValue();
- int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+ int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
if (numToAssign > 0) {
List<Integer> nextPartitions =
getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
for (Integer pId : nextPartitions) {
- String pName = pName(taskResource, pId);
+ String pName = pName(jobResource, pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
@@ -316,10 +368,10 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// Construct a ResourceAssignment object from the map of partition assignments.
- ResourceAssignment ra = new ResourceAssignment(taskResource);
+ ResourceAssignment ra = new ResourceAssignment(jobResource);
for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
PartitionAssignment pa = e.getValue();
- ra.addReplicaMap(new Partition(pName(taskResource, e.getKey())),
+ ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
ImmutableMap.of(pa._instance, pa._state));
}
@@ -327,14 +379,14 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
/**
- * Checks if the task has completed.
+ * Checks if the job has completed.
* @param ctx The rebalancer context.
* @param allPartitions The set of partitions to check.
* @return true if all task partitions have been marked with status
* {@link TaskPartitionState#COMPLETED} in the rebalancer
* context, false otherwise.
*/
- private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions) {
for (Integer pId : allPartitions) {
TaskPartitionState state = ctx.getPartitionState(pId);
if (state != TaskPartitionState.COMPLETED) {
@@ -346,13 +398,13 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
/**
* Checks if the workflow has completed.
- * @param ctx Workflow context containing task states
- * @param cfg Workflow config containing set of tasks
+ * @param ctx Workflow context containing job states
+ * @param cfg Workflow config containing set of jobs
* @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
*/
private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
- for (String task : cfg.getTaskDag().getAllNodes()) {
- if (ctx.getTaskState(task) != TaskState.COMPLETED) {
+ for (String job : cfg.getJobDag().getAllNodes()) {
+ if (ctx.getJobState(job) != TaskState.COMPLETED) {
return false;
}
}
@@ -366,8 +418,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
* @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
*/
private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
- for (String task : cfg.getTaskDag().getAllNodes()) {
- if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
+ for (String job : cfg.getJobDag().getAllNodes()) {
+ if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
return false;
}
}
@@ -381,9 +433,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
/**
- * Cleans up all Helix state associated with this task, wiping workflow-level information if this
- * is the last
- * remaining task in its workflow.
+ * Cleans up all Helix state associated with this job, wiping workflow-level information if this
+ * is the last remaining job in its workflow.
*/
private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
String workflowResource) {
@@ -416,17 +467,17 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
boolean lastInWorkflow = true;
- for (String task : cfg.getTaskDag().getAllNodes()) {
- // check if property store information or resource configs exist for this task
- if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+ for (String job : cfg.getJobDag().getAllNodes()) {
+ // check if property store information or resource configs exist for this job
+ if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
AccessOption.PERSISTENT)
- || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
- || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
+ || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
+ || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
lastInWorkflow = false;
}
}
- // clean up task-level info if this was the last in workflow
+ // clean up job-level info if this was the last in workflow
if (lastInWorkflow) {
// delete workflow config
PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
@@ -462,9 +513,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
return accessor.keyBuilder().resourceConfig(resource);
}
- private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
- for (String pName : pNames) {
- pIds.add(pId(pName));
+ private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+ for (Integer pId : toAdd) {
+ destination.add(pId);
}
}
@@ -472,7 +523,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
return new ResourceAssignment(name);
}
- private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+ private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
Iterable<Integer> pIds) {
for (Integer pId : pIds) {
TaskPartitionState state = ctx.getPartitionState(pId);
@@ -482,30 +533,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
}
}
- /**
- * Returns the set of all partition ids for a task.
- * <p/>
- * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
- * use the list of all partition ids from the target resource.
- */
- private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
- Set<Integer> taskPartitions = new HashSet<Integer>();
- if (taskCfg.getTargetPartitions() != null) {
- for (Integer pId : taskCfg.getTargetPartitions()) {
- taskPartitions.add(pId);
- }
- } else {
- for (String pName : tgtResourceIs.getPartitionSet()) {
- taskPartitions.add(pId(pName));
- }
- }
-
- return taskPartitions;
- }
-
private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
Set<Integer> excluded, int n) {
- List<Integer> result = new ArrayList<Integer>(n);
+ List<Integer> result = new ArrayList<Integer>();
for (Integer pId : candidatePartitions) {
if (result.size() >= n) {
break;
@@ -519,55 +549,19 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
return result;
}
- private static void markPartitionCompleted(TaskContext ctx, int pId) {
+ private static void markPartitionCompleted(JobContext ctx, int pId) {
ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
ctx.incrementNumAttempts(pId);
}
- private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
+ private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state) {
ctx.setPartitionState(pId, state);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
ctx.incrementNumAttempts(pId);
}
/**
- * Get partition assignments for the target resource, but only for the partitions of interest.
- * @param currStateOutput The current state of the instances in the cluster.
- * @param instanceList The set of instances.
- * @param tgtIs The ideal state of the target resource.
- * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
- * do not need to
- * be in any specific state to be considered.
- * @param includeSet The set of partitions to consider.
- * @return A map of instance vs set of partition ids assigned to that instance.
- */
- private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
- CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
- Set<String> tgtStates, Set<Integer> includeSet) {
- Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList) {
- result.put(instance, new TreeSet<Integer>());
- }
-
- for (String pName : tgtIs.getPartitionSet()) {
- int pId = pId(pName);
- if (includeSet.contains(pId)) {
- for (String instance : instanceList) {
- String state =
- currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
- instance);
- if (tgtStates == null || tgtStates.contains(state)) {
- result.get(instance).add(pId);
- }
- }
- }
- }
-
- return result;
- }
-
- /**
* Return the assignment of task partitions per instance.
*/
private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
@@ -596,14 +590,14 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
/**
* Computes the partition name given the resource name and partition id.
*/
- private static String pName(String resource, int pId) {
+ protected static String pName(String resource, int pId) {
return resource + "_" + pId;
}
/**
* Extracts the partition id from the given partition name.
*/
- private static int pId(String pName) {
+ protected static int pId(String pName) {
String[] tokens = pName.split("_");
return Integer.valueOf(tokens[tokens.length - 1]);
}
@@ -624,6 +618,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
@Override
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ // All of the heavy lifting is in the ResourceAssignment computation,
+ // so this part can just be a no-op.
return currentIdealState;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
index 8c6629d..95b8d72 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
/**
* The result of a task execution.
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 97bf52b..dea383b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
import org.apache.helix.HelixManager;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 5efb01f..2cc6d6c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
/**
* Enumeration of current task states. This value is stored in the rebalancer context.
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 69a3a4e..78f27df 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
import java.util.Map;
import java.util.Timer;
@@ -196,10 +212,37 @@ public class TaskStateModel extends StateModel {
}
private void startTask(Message msg, String taskPartition) {
- TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
- TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
- Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+ JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+ TaskConfig taskConfig = null;
+ String command = cfg.getCommand();
+
+ // Get a task-specific command if specified
+ JobContext ctx = TaskUtil.getJobContext(_manager, msg.getResourceName());
+ int pId = Integer.parseInt(taskPartition.substring(taskPartition.lastIndexOf('_') + 1));
+ if (ctx.getTaskIdForPartition(pId) != null) {
+ taskConfig = cfg.getTaskConfig(ctx.getTaskIdForPartition(pId));
+ if (taskConfig != null) {
+ if (taskConfig.getCommand() != null) {
+ command = taskConfig.getCommand();
+ }
+ }
+ }
+
+ // Populate a task callback context
+ TaskCallbackContext callbackContext = new TaskCallbackContext();
+ callbackContext.setManager(_manager);
+ callbackContext.setJobConfig(cfg);
+ callbackContext.setTaskConfig(taskConfig);
+
+ // Create a task instance with this command
+ if (command == null || _taskFactoryRegistry == null
+ || !_taskFactoryRegistry.containsKey(command)) {
+ throw new IllegalStateException("No callback implemented for task " + command);
+ }
+ TaskFactory taskFactory = _taskFactoryRegistry.get(command);
+ Task task = taskFactory.createNewTask(callbackContext);
+ // Submit the task for execution
_taskRunner =
new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
msg.getTgtSessionId());
@@ -214,6 +257,6 @@ public class TaskStateModel extends StateModel {
_taskRunner.timeout();
}
}
- }, cfg.getTimeoutPerPartition());
+ }, cfg.getTimeoutPerTask());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 4cd85d1..51e8c95 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 741ed4d..a5c97ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -1,8 +1,26 @@
+package org.apache.helix.task;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task;
+import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -11,6 +29,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
@@ -18,8 +37,11 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
/**
* Static utility methods.
@@ -30,16 +52,24 @@ public class TaskUtil {
private static final String PREV_RA_NODE = "PreviousResourceAssignment";
/**
- * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+ * Parses job resource configurations in Helix into a {@link JobConfig} object.
* @param manager HelixManager object used to connect to Helix.
- * @param taskResource The name of the task resource.
- * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+ * @param jobResource The name of the job resource.
+ * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
- Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
- TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
+ public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
+ HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+ JobConfig.Builder b =
+ JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
+ Map<String, Map<String, String>> rawTaskConfigMap =
+ jobResourceConfig.getRecord().getMapFields();
+ Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
+ for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+ TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+ taskConfigMap.put(taskConfig.getId(), taskConfig);
+ }
+ b.addTaskConfigMap(taskConfigMap);
return b.build();
}
@@ -89,17 +119,17 @@ public class TaskUtil {
ra.getRecord(), AccessOption.PERSISTENT);
}
- public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+ public static JobContext getJobContext(HelixManager manager, String jobResource) {
ZNRecord r =
manager.getHelixPropertyStore().get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
null, AccessOption.PERSISTENT);
- return r != null ? new TaskContext(r) : null;
+ return r != null ? new JobContext(r) : null;
}
- public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+ public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
manager.getHelixPropertyStore().set(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
ctx.getRecord(), AccessOption.PERSISTENT);
}
@@ -118,12 +148,36 @@ public class TaskUtil {
ctx.getRecord(), AccessOption.PERSISTENT);
}
- public static String getNamespacedTaskName(String singleTaskWorkflow) {
- return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+ public static String getNamespacedJobName(String singleJobWorkflow) {
+ return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
}
- public static String getNamespacedTaskName(String workflowResource, String taskName) {
- return workflowResource + "_" + taskName;
+ public static String getNamespacedJobName(String workflowResource, String jobName) {
+ return workflowResource + "_" + jobName;
+ }
+
+ public static String serializeJobConfigMap(Map<String, String> commandConfig) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ String serializedMap = mapper.writeValueAsString(commandConfig);
+ return serializedMap;
+ } catch (IOException e) {
+ LOG.error("Error serializing " + commandConfig, e);
+ }
+ return null;
+ }
+
+ public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ Map<String, String> commandConfigMap =
+ mapper.readValue(commandConfig, new TypeReference<HashMap<String, String>>() {
+ });
+ return commandConfigMap;
+ } catch (IOException e) {
+ LOG.error("Error deserializing " + commandConfig, e);
+ }
+ return Collections.emptyMap();
}
private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
@@ -140,6 +194,12 @@ public class TaskUtil {
taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
}
- return taskCfg;
+ return getResourceConfig(manager, resource).getRecord().getSimpleFields();
+ }
+
+ private static HelixProperty getResourceConfig(HelixManager manager, String resource) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.getProperty(keyBuilder.resourceConfig(resource));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 7bc8d73..5b27fb6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -1,23 +1,47 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.Reader;
import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.task.beans.JobBean;
import org.apache.helix.task.beans.TaskBean;
import org.apache.helix.task.beans.WorkflowBean;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
/**
- * Houses a task dag and config set to fully describe a task workflow
+ * Houses a job dag and config set to fully describe a job workflow
*/
public class Workflow {
/** Default workflow name, useful constant for single-node workflows */
@@ -29,16 +53,19 @@ public class Workflow {
/** Holds workflow-level configurations */
private WorkflowConfig _workflowConfig;
- /** Contains the per-task configurations for all tasks specified in the provided dag */
- private Map<String, Map<String, String>> _taskConfigs;
+ /** Contains the per-job configurations for all jobs specified in the provided dag */
+ private Map<String, Map<String, String>> _jobConfigs;
+
+ /** Containers the per-job configurations of all individually-specified tasks */
+ private Map<String, List<TaskConfig>> _taskConfigs;
/** Constructs and validates a workflow against a provided dag and config set */
private Workflow(String name, WorkflowConfig workflowConfig,
- Map<String, Map<String, String>> taskConfigs) {
+ Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
_name = name;
_workflowConfig = workflowConfig;
+ _jobConfigs = jobConfigs;
_taskConfigs = taskConfigs;
-
validate();
}
@@ -46,13 +73,17 @@ public class Workflow {
return _name;
}
- public Map<String, Map<String, String>> getTaskConfigs() {
+ public Map<String, Map<String, String>> getJobConfigs() {
+ return _jobConfigs;
+ }
+
+ public Map<String, List<TaskConfig>> getTaskConfigs() {
return _taskConfigs;
}
public Map<String, String> getResourceConfigMap() throws Exception {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+ cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
@@ -78,19 +109,19 @@ public class Workflow {
*
* <pre>
* name: MyFlow
- * tasks:
- * - name : TaskA
+ * jobs:
+ * - name : JobA
* command : SomeTask
* ...
- * - name : TaskB
- * parents : [TaskA]
+ * - name : JobB
+ * parents : [JobA]
* command : SomeOtherTask
* ...
- * - name : TaskC
+ * - name : JobC
* command : AnotherTask
* ...
- * - name : TaskD
- * parents : [TaskB, TaskC]
+ * - name : JobD
+ * parents : [JobB, JobC]
* command : AnotherTask
* ...
* </pre>
@@ -107,37 +138,44 @@ public class Workflow {
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
Builder builder = new Builder(wf.name);
- for (TaskBean task : wf.tasks) {
- if (task.name == null) {
- throw new IllegalArgumentException("A task must have a name.");
+ for (JobBean job : wf.jobs) {
+ if (job.name == null) {
+ throw new IllegalArgumentException("A job must have a name.");
}
- if (task.parents != null) {
- for (String parent : task.parents) {
- builder.addParentChildDependency(parent, task.name);
+ if (job.parents != null) {
+ for (String parent : job.parents) {
+ builder.addParentChildDependency(parent, job.name);
}
}
- builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
- builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
- if (task.commandConfig != null) {
- builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+ builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
+ builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+ if (job.jobConfigMap != null) {
+ builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
}
- builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
- if (task.targetPartitionStates != null) {
- builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES,
- Joiner.on(",").join(task.targetPartitionStates));
+ builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+ if (job.targetPartitionStates != null) {
+ builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+ Joiner.on(",").join(job.targetPartitionStates));
}
- if (task.targetPartitions != null) {
- builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS,
- Joiner.on(",").join(task.targetPartitions));
+ if (job.targetPartitions != null) {
+ builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+ Joiner.on(",").join(job.targetPartitions));
+ }
+ builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+ String.valueOf(job.maxAttemptsPerPartition));
+ builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ String.valueOf(job.numConcurrentTasksPerInstance));
+ builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+ String.valueOf(job.timeoutPerPartition));
+ if (job.tasks != null) {
+ List<TaskConfig> taskConfigs = Lists.newArrayList();
+ for (TaskBean task : job.tasks) {
+ taskConfigs.add(TaskConfig.from(task));
+ }
+ builder.addTaskConfigs(job.name, taskConfigs);
}
- builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
- String.valueOf(task.maxAttemptsPerPartition));
- builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
- String.valueOf(task.numConcurrentTasksPerInstance));
- builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION,
- String.valueOf(task.timeoutPerPartition));
}
return builder.build();
@@ -149,47 +187,78 @@ public class Workflow {
*/
public void validate() {
// validate dag and configs
- if (!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes())) {
+ if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
throw new IllegalArgumentException("Nodes specified in DAG missing from config");
- } else if (!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet())) {
+ } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) {
throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
}
- _workflowConfig.getTaskDag().validate();
+ _workflowConfig.getJobDag().validate();
- for (String node : _taskConfigs.keySet()) {
+ for (String node : _jobConfigs.keySet()) {
buildConfig(node);
}
}
- /** Builds a TaskConfig from config map. Useful for validating configs */
- private TaskConfig buildConfig(String task) {
- return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+ /** Builds a JobConfig from config map. Useful for validating configs */
+ private JobConfig buildConfig(String job) {
+ JobConfig.Builder b = JobConfig.Builder.fromMap(_jobConfigs.get(job));
+ if (_taskConfigs != null && _taskConfigs.containsKey(job)) {
+ b.addTaskConfigs(_taskConfigs.get(job));
+ }
+ return b.build();
}
/** Build a workflow incrementally from dependencies and single configs, validate at build time */
public static class Builder {
private String _name;
- private TaskDag _dag;
- private Map<String, Map<String, String>> _taskConfigs;
+ private JobDag _dag;
+ private Map<String, Map<String, String>> _jobConfigs;
+ private Map<String, List<TaskConfig>> _taskConfigs;
private long _expiry;
public Builder(String name) {
_name = name;
- _dag = new TaskDag();
- _taskConfigs = new TreeMap<String, Map<String, String>>();
+ _dag = new JobDag();
+ _jobConfigs = new TreeMap<String, Map<String, String>>();
+ _taskConfigs = new TreeMap<String, List<TaskConfig>>();
_expiry = -1;
}
public Builder addConfig(String node, String key, String val) {
node = namespacify(node);
_dag.addNode(node);
+ if (!_jobConfigs.containsKey(node)) {
+ _jobConfigs.put(node, new TreeMap<String, String>());
+ }
+ _jobConfigs.get(node).put(key, val);
+ return this;
+ }
- if (!_taskConfigs.containsKey(node)) {
- _taskConfigs.put(node, new TreeMap<String, String>());
+ public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) {
+ return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+ }
+
+ public Builder addJobConfig(String node, JobConfig jobConfig) {
+ for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
+ String key = e.getKey();
+ String val = e.getValue();
+ addConfig(node, key, val);
}
- _taskConfigs.get(node).put(key, val);
+ addTaskConfigs(node, jobConfig.getTaskConfigMap().values());
+ return this;
+ }
+ public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) {
+ node = namespacify(node);
+ _dag.addNode(node);
+ if (!_taskConfigs.containsKey(node)) {
+ _taskConfigs.put(node, new ArrayList<TaskConfig>());
+ }
+ if (!_jobConfigs.containsKey(node)) {
+ _jobConfigs.put(node, new TreeMap<String, String>());
+ }
+ _taskConfigs.get(node).addAll(taskConfigs);
return this;
}
@@ -207,13 +276,13 @@ public class Workflow {
}
public String namespacify(String task) {
- return TaskUtil.getNamespacedTaskName(_name, task);
+ return TaskUtil.getNamespacedJobName(_name, task);
}
public Workflow build() {
- for (String task : _taskConfigs.keySet()) {
+ for (String task : _jobConfigs.keySet()) {
// addConfig(task, TaskConfig.WORKFLOW_ID, _name);
- _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+ _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
}
WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
@@ -223,7 +292,8 @@ public class Workflow {
builder.setExpiry(_expiry);
}
- return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+ return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
+ // internally
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 322deb7..6f10955 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -1,5 +1,24 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.Map;
/**
@@ -15,18 +34,18 @@ public class WorkflowConfig {
public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
/* Member variables */
- private TaskDag _taskDag;
+ private JobDag _jobDag;
private TargetState _targetState;
private long _expiry;
- private WorkflowConfig(TaskDag taskDag, TargetState targetState, long expiry) {
- _taskDag = taskDag;
+ private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) {
+ _jobDag = jobDag;
_targetState = targetState;
_expiry = expiry;
}
- public TaskDag getTaskDag() {
- return _taskDag;
+ public JobDag getJobDag() {
+ return _jobDag;
}
public TargetState getTargetState() {
@@ -38,7 +57,7 @@ public class WorkflowConfig {
}
public static class Builder {
- private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+ private JobDag _taskDag = JobDag.EMPTY_DAG;
private TargetState _targetState = TargetState.START;
private long _expiry = DEFAULT_EXPIRY;
@@ -52,7 +71,7 @@ public class WorkflowConfig {
return new WorkflowConfig(_taskDag, _targetState, _expiry);
}
- public Builder setTaskDag(TaskDag v) {
+ public Builder setTaskDag(JobDag v) {
_taskDag = v;
return this;
}
@@ -74,7 +93,7 @@ public class WorkflowConfig {
b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
}
if (cfg.containsKey(DAG)) {
- b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+ b.setTaskDag(JobDag.fromJson(cfg.get(DAG)));
}
if (cfg.containsKey(TARGET_STATE)) {
b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 0c9a9b3..4feda1b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -1,5 +1,24 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.Map;
import java.util.TreeMap;
@@ -39,22 +58,22 @@ public class WorkflowContext extends HelixProperty {
return TaskState.valueOf(s);
}
- public void setTaskState(String taskResource, TaskState s) {
+ public void setJobState(String jobResource, TaskState s) {
Map<String, String> states = _record.getMapField(TASK_STATES);
if (states == null) {
states = new TreeMap<String, String>();
_record.setMapField(TASK_STATES, states);
}
- states.put(taskResource, s.name());
+ states.put(jobResource, s.name());
}
- public TaskState getTaskState(String taskResource) {
+ public TaskState getJobState(String jobResource) {
Map<String, String> states = _record.getMapField(TASK_STATES);
if (states == null) {
return null;
}
- String s = states.get(taskResource);
+ String s = states.get(jobResource);
if (s == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
new file mode 100644
index 0000000..5e12f19
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+
+/**
+ * Bean class used for parsing job definitions from YAML.
+ */
+public class JobBean {
+ public String name;
+ public List<String> parents;
+ public String targetResource;
+ public List<String> targetPartitionStates;
+ public List<String> targetPartitions;
+ public String command;
+ public Map<String, String> jobConfigMap;
+ public List<TaskBean> tasks;
+ public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+ public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index 0efb608..eedccb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -1,25 +1,32 @@
+package org.apache.helix.task.beans;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task.beans;
-import java.util.List;
import java.util.Map;
-import org.apache.helix.task.TaskConfig;
-
/**
- * Bean class used for parsing task definitions from YAML.
+ * Describes task-specific configuration, including an arbitrary map of
+ * key-value pairs to pass to the task
*/
+
public class TaskBean {
- public String name;
- public List<String> parents;
- public String targetResource;
- public List<String> targetPartitionStates;
- public List<Integer> targetPartitions;
public String command;
- public Map<String, Object> commandConfig;
- public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
- public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+ public Map<String, String> taskConfigMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 984f0f4..76da4c8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task.beans;
+
/*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.helix.task.beans;
import java.util.List;
@@ -11,5 +27,5 @@ import java.util.List;
public class WorkflowBean {
public String name;
public String expiry;
- public List<TaskBean> tasks;
+ public List<JobBean> jobs;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 17722f1..38903c7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration;
*/
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
@@ -41,6 +42,8 @@ import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.beust.jcommander.internal.Lists;
+
public class TestCustomizedIdealStateRebalancer extends
ZkStandAloneCMTestBaseWithPropertyServerCheck {
String db2 = TEST_DB + "2";
@@ -58,8 +61,11 @@ public class TestCustomizedIdealStateRebalancer extends
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
testRebalancerInvoked = true;
+ List<String> liveNodes = Lists.newArrayList(clusterData.getLiveInstances().keySet());
+ int i = 0;
for (String partition : currentIdealState.getPartitionSet()) {
- String instance = currentIdealState.getPreferenceList(partition).get(0);
+ int index = i++ % liveNodes.size();
+ String instance = liveNodes.get(index);
currentIdealState.getPreferenceList(partition).clear();
currentIdealState.getPreferenceList(partition).add(instance);
@@ -97,8 +103,8 @@ public class TestCustomizedIdealStateRebalancer extends
}
IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
for (String partition : is.getPartitionSet()) {
- Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
- Assert.assertEquals(is.getInstanceStateMap(partition).size(), 3);
+ Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
+ Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
}
Assert.assertTrue(testRebalancerCreated);
Assert.assertTrue(testRebalancerInvoked);