You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:04 UTC
[08/33] helix git commit: Add new job option to allow contining a job
even its direct dependent job fails.
Add new job option to allow contining a job even its direct dependent job fails.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/be660245
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/be660245
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/be660245
Branch: refs/heads/helix-0.6.x
Commit: be660245fc1a9f4b22fba58c4b25a1af19555066
Parents: 579d82f
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Jan 27 10:10:31 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:44:56 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 26 +-
.../org/apache/helix/task/JobRebalancer.java | 26 +-
.../org/apache/helix/task/TaskRebalancer.java | 79 +++++-
.../org/apache/helix/task/WorkflowConfig.java | 23 +-
.../apache/helix/task/WorkflowRebalancer.java | 28 +-
.../apache/helix/integration/task/MockTask.java | 2 +-
.../helix/integration/task/TaskTestUtil.java | 7 +-
.../task/TestJobFailureDependence.java | 283 +++++++++++++++++++
.../task/TestRunJobsWithMissingTarget.java | 41 ++-
9 files changed, 469 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 37a2f35..65a9caf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -96,6 +96,11 @@ public class JobConfig {
TASK_RETRY_DELAY("TaskRetryDelay"),
/**
+ * Whether failure of directly dependent jobs should fail this job.
+ */
+ IGNORE_DEPENDENT_JOB_FAILURE("IgnoreDependentJobFailure"),
+
+ /**
* The individual task configurations, if any *
*/
TASK_CONFIGS("TaskConfigs"),
@@ -124,6 +129,7 @@ public class JobConfig {
public static final int DEFAULT_FAILURE_THRESHOLD = 0;
public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
+ public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false;
private final String _workflow;
private final String _targetResource;
@@ -138,13 +144,14 @@ public class JobConfig {
private final int _failureThreshold;
private final long _retryDelay;
private final boolean _disableExternalView;
+ private final boolean _ignoreDependentJobFailure;
private final Map<String, TaskConfig> _taskConfigMap;
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
- boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
+ boolean disableExternalView, boolean ignoreDependentJobFailure, Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -158,6 +165,7 @@ public class JobConfig {
_failureThreshold = failureThreshold;
_retryDelay = retryDelay;
_disableExternalView = disableExternalView;
+ _ignoreDependentJobFailure = ignoreDependentJobFailure;
if (taskConfigMap != null) {
_taskConfigMap = taskConfigMap;
} else {
@@ -217,6 +225,8 @@ public class JobConfig {
return _disableExternalView;
}
+ public boolean isIgnoreDependentJobFailure() { return _ignoreDependentJobFailure; }
+
public Map<String, TaskConfig> getTaskConfigMap() {
return _taskConfigMap;
}
@@ -260,6 +270,8 @@ public class JobConfig {
Boolean.toString(_disableExternalView));
cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
"" + _numConcurrentTasksPerInstance);
+ cfgMap.put(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value(),
+ Boolean.toString(_ignoreDependentJobFailure));
return cfgMap;
}
@@ -281,6 +293,7 @@ public class JobConfig {
private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
+ private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
public JobConfig build() {
validate();
@@ -288,7 +301,7 @@ public class JobConfig {
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
_maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
- _disableExternalView, _taskConfigMap);
+ _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap);
}
/**
@@ -346,6 +359,10 @@ public class JobConfig {
b.setDisableExternalView(
Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
}
+ if (cfg.containsKey(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())) {
+ b.setIgnoreDependentJobFailure(
+ Boolean.valueOf(cfg.get(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())));
+ }
return b;
}
@@ -414,6 +431,11 @@ public class JobConfig {
return this;
}
+ public Builder setIgnoreDependentJobFailure(boolean ignoreDependentJobFailure) {
+ _ignoreDependentJobFailure = ignoreDependentJobFailure;
+ return this;
+ }
+
public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
if (taskConfigs != null) {
for (TaskConfig taskConfig : taskConfigs) {
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 7eeafc7..5b41773 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -86,22 +86,28 @@ public class JobRebalancer extends TaskRebalancer {
return buildEmptyAssignment(jobName, currStateOutput);
}
+ // Stop current run of the job if workflow or job is already in final state (failed or completed)
+ TaskState workflowState = workflowCtx.getWorkflowState();
TaskState jobState = workflowCtx.getJobState(jobName);
// The job is already in a final state (completed/failed).
- if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
- LOG.info("Job " + jobName + " is failed or already completed, clean up IS.");
+ if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED ||
+ jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+ LOG.info(String.format(
+ "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
+ workflowResource, jobName, workflowState, jobState));
cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
_scheduledRebalancer.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
- LOG.info("Job is not ready to be scheduled since workflow is not ready " + jobName);
+ LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
- if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) {
- LOG.info("Job is not ready to be scheduled " + jobName);
+ if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
+ workflowCtx)) {
+ LOG.info("Job is not ready to run " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -429,16 +435,6 @@ public class JobRebalancer extends TaskRebalancer {
return ra;
}
- private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
- WorkflowContext workflowContext) {
- long currentTime = System.currentTimeMillis();
- workflowContext.setJobState(jobName, TaskState.FAILED);
- jobContext.setFinishTime(currentTime);
- if (isWorkflowFinished(workflowContext, workflowConfig)) {
- workflowContext.setFinishTime(currentTime);
- }
- }
-
private void markJobComplete(String jobName, JobContext jobContext,
WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
long currentTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index b006efc..6aaeb5f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -74,13 +74,17 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
*/
protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) {
boolean incomplete = false;
+ int failedJobs = 0;
for (String job : cfg.getJobDag().getAllNodes()) {
TaskState jobState = ctx.getJobState(job);
if (jobState == TaskState.FAILED) {
- ctx.setWorkflowState(TaskState.FAILED);
- return true;
+ failedJobs ++;
+ if (failedJobs > cfg.getFailureThreshold()) {
+ ctx.setWorkflowState(TaskState.FAILED);
+ return true;
+ }
}
- if (jobState != TaskState.COMPLETED) {
+ if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED) {
incomplete = true;
}
}
@@ -136,31 +140,78 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx) {
int notStartedCount = 0;
- int inCompleteCount = 0;
int failedCount = 0;
- for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
- TaskState jobState = workflowCtx.getJobState(ancestor);
+ for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
+ TaskState jobState = workflowCtx.getJobState(parent);
if (jobState == null || jobState == TaskState.NOT_STARTED) {
++notStartedCount;
- } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
- ++inCompleteCount;
- } else if (jobState == TaskState.FAILED) {
+ }
+ if (jobState == TaskState.FAILED) {
++failedCount;
}
}
- if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()
- || failedCount > 0) {
- LOG.debug(String.format(
- "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d, failedParent(s)=%d.",
- job, notStartedCount, inCompleteCount, failedCount));
+ if (notStartedCount > 0) {
+ LOG.debug(String
+ .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount));
+ return false;
+ }
+
+ JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
+ markJobFailed(job, null, workflowCfg, workflowCtx);
+ LOG.debug(
+ String.format("Job %s is not ready to start, failedCount(s)=%d.", job, failedCount));
+ return false;
+ }
+
+ int inCompleteCount = getInCompleteJobCount(workflowCfg, workflowCtx);
+ if (inCompleteCount >= workflowCfg.getParallelJobs()) {
+ LOG.debug(String
+ .format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job, inCompleteCount));
return false;
}
return true;
}
+ protected boolean isJobStarted(String job, WorkflowContext workflowContext) {
+ TaskState jobState = workflowContext.getJobState(job);
+ return (jobState != null && jobState != TaskState.NOT_STARTED);
+ }
+
+ /**
+ * Count the number of jobs in a workflow that are in progress.
+ *
+ * @param workflowCfg
+ * @param workflowCtx
+ * @return
+ */
+ protected int getInCompleteJobCount(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+ int inCompleteCount = 0;
+ for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+ TaskState jobState = workflowCtx.getJobState(jobName);
+ if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+ ++inCompleteCount;
+ }
+ }
+
+ return inCompleteCount;
+ }
+
+ protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ long currentTime = System.currentTimeMillis();
+ workflowContext.setJobState(jobName, TaskState.FAILED);
+ if (jobContext != null) {
+ jobContext.setFinishTime(currentTime);
+ }
+ if (isWorkflowFinished(workflowContext, workflowConfig)) {
+ workflowContext.setFinishTime(currentTime);
+ }
+ }
+
/**
* Check if a workflow is ready to schedule.
*
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c81654..955cb77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -40,9 +40,11 @@ public class WorkflowConfig {
public static final String RECURRENCE_UNIT = "RecurrenceUnit";
public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
public static final String TERMINABLE = "Terminable";
+ public static final String FAILURE_THRESHOLD = "FailureThreshold";
/* Default values */
public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+ public static final int DEFAULT_FAILURE_THRESHOLD = 0;
/* Member variables */
// TODO: jobDag should not be in the workflowConfig.
@@ -56,13 +58,15 @@ public class WorkflowConfig {
private final long _expiry;
private final boolean _terminable;
private final ScheduleConfig _scheduleConfig;
+ private final int _failureThreshold;
protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long expiry,
- boolean terminable, ScheduleConfig scheduleConfig) {
+ int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig) {
_jobDag = jobDag;
_parallelJobs = parallelJobs;
_targetState = targetState;
_expiry = expiry;
+ _failureThreshold = failureThreshold;
_terminable = terminable;
_scheduleConfig = scheduleConfig;
}
@@ -83,6 +87,10 @@ public class WorkflowConfig {
return _expiry;
}
+ public int getFailureThreshold() {
+ return _failureThreshold;
+ }
+
public boolean isTerminable() {
return _terminable;
}
@@ -128,6 +136,7 @@ public class WorkflowConfig {
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
+ cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(getFailureThreshold()));
// Populate schedule if present
ScheduleConfig scheduleConfig = getScheduleConfig();
@@ -151,13 +160,15 @@ public class WorkflowConfig {
private int _parallelJobs = 1;
private TargetState _targetState = TargetState.START;
private long _expiry = DEFAULT_EXPIRY;
+ private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
private boolean _isTerminable = true;
private ScheduleConfig _scheduleConfig;
public WorkflowConfig build() {
validate();
- return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _isTerminable, _scheduleConfig);
+ return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
+ _isTerminable, _scheduleConfig);
}
public Builder() {}
@@ -191,6 +202,11 @@ public class WorkflowConfig {
return this;
}
+ public Builder setFailureThreshold(int failureThreshold) {
+ _failureThreshold = failureThreshold;
+ return this;
+ }
+
public Builder setTerminable(boolean isTerminable) {
_isTerminable = isTerminable;
return this;
@@ -211,6 +227,9 @@ public class WorkflowConfig {
if (cfg.containsKey(EXPIRY)) {
b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
}
+ if (cfg.containsKey(FAILURE_THRESHOLD)) {
+ b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+ }
if (cfg.containsKey(DAG)) {
b.setJobDag(JobDag.fromJson(cfg.get(DAG)));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 05b6dc6..682ac77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -30,7 +30,13 @@ import org.apache.log4j.Logger;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
/**
* Custom rebalancer implementation for the {@code Workflow} in task state model.
@@ -63,7 +69,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
- cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+ cleanupWorkflow(workflow, workflowCfg);
return buildEmptyAssignment(workflow, currStateOutput);
}
@@ -91,7 +97,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
- cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+ cleanupWorkflow(workflow, workflowCfg);
} else {
// schedule future cleanup work
long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -113,7 +119,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
if (isReady) {
// Schedule jobs from this workflow.
- scheduleJobs(workflowCfg, workflowCtx);
+ scheduleJobs(workflow, workflowCfg, workflowCtx);
} else {
LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
}
@@ -126,23 +132,32 @@ public class WorkflowRebalancer extends TaskRebalancer {
* Figure out whether the jobs in the workflow should be run,
* and if it's ready, then just schedule it
*/
- private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+ private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
LOG.debug("Jobs from recurring workflow are not schedule-able");
return;
}
+ int scheduledJobs = 0;
for (String job : workflowCfg.getJobDag().getAllNodes()) {
TaskState jobState = workflowCtx.getJobState(job);
if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
LOG.debug("Job " + job + " is already started or completed.");
continue;
}
+
+ if (scheduledJobs >= workflowCfg.getParallelJobs()) {
+ LOG.debug(String.format("Workflow %s already have enough job in progress, "
+ + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+ break;
+ }
+
// check ancestor job status
if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
scheduleSingleJob(job, jobConfig);
+ scheduledJobs++;
}
}
}
@@ -382,8 +397,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
* Cleans up workflow configs and workflow contexts associated with this workflow,
* including all job-level configs and context, plus workflow-level information.
*/
- private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
- WorkflowContext workflowCtx) {
+ private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 71fa12d..dad9949 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -39,7 +39,7 @@ public class MockTask implements Task {
if (cfg == null) {
cfg = Collections.emptyMap();
}
- _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+ _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 100L;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 9796497..011f532 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -258,7 +258,7 @@ public class TaskTestUtil {
return buildRecurrentJobQueue(jobQueueName, 0);
}
- public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) {
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, int failureThreshold) {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
Calendar cal = Calendar.getInstance();
@@ -267,10 +267,13 @@ public class TaskTestUtil {
cal.set(Calendar.MILLISECOND, 0);
cfgMap.put(WorkflowConfig.START_TIME,
WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+ if (failureThreshold > 0) {
+ cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(failureThreshold));
+ }
return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
}
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
- return buildJobQueue(jobQueueName, 0);
+ return buildJobQueue(jobQueueName, 0, 0);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
new file mode 100644
index 0000000..9e2456c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
@@ -0,0 +1,283 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestJobFailureDependence extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestJobFailureDependence.class);
+ private static final int num_nodes = 5;
+ private static final int num_dbs = 5;
+ private static final int START_PORT = 12918;
+ private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+ private static final int NUM_PARTITIONS = 20;
+ private static final int NUM_REPLICAS = 3;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+ private ClusterControllerManager _controller;
+ private ClusterSetup _setupTool;
+
+ private List<String> _test_dbs = new ArrayList<String>();
+
+ private HelixManager _manager;
+ private TaskDriver _driver;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < num_nodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // Set up target dbs
+ for (int i = 0; i < num_dbs; i++) {
+ String db = "TestDB" + i;
+ _setupTool
+ .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
+ IdealState.RebalanceMode.FULL_AUTO.toString());
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+ _test_dbs.add(db);
+ }
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < num_nodes; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ _driver = new TaskDriver(_manager);
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _manager.disconnect();
+ _controller.syncStop();
+ for (int i = 0; i < num_nodes; i++) {
+ _participants[i].syncStop();
+ }
+ }
+
+ @Test
+ public void testJobDependantFailure() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+ // all jobs after failed job should fail too.
+ for (int i = 2; i < num_dbs; i++) {
+ String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED);
+ }
+ }
+
+ @Test
+ public void testJobDependantWorkflowFailure() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+ String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+ TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+ }
+
+ @Test
+ public void testIgnoreJobDependantFailure() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+ String namedSpaceJob2 = String.format("%s_%s", queueName, currentJobNames.get(2));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob2, TaskState.FAILED);
+
+ // all jobs after failed job should complete.
+ for (int i = 3; i < num_dbs; i++) {
+ String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED);
+ }
+ }
+
+ @Test
+ public void testWorkflowFailureJobThreshold() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+
+ String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+ String lastJob =
+ String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
+ TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED);
+
+ _driver.flushQueue(queueName);
+
+ WorkflowConfig currentWorkflowConfig = _driver.getWorkflowConfig(queueName);
+ WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
+
+ configBuilder.setFailureThreshold(0);
+ _driver.updateWorkflow(queueName, configBuilder.build());
+ _driver.stop(queueName);
+
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ _driver.enqueueJob(queueName, jobName, jobConfig);
+ }
+
+ _driver.resume(queueName);
+
+ namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+ TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 101604b..7eeb3f4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -140,6 +140,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
for (int i = 0; i < num_nodes; i++) {
_participants[i].syncStop();
}
+ _setupTool.deleteCluster(CLUSTER_NAME);
}
@Test
@@ -161,12 +162,44 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
currentJobNames.add(jobName);
}
- _driver.start(queueBuilder.build());
_setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+ _driver.start(queueBuilder.build());
- String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
- TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+ String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(2));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED);
TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+
+ _driver.delete(queueName);
+ }
+
+ @Test
+ public void testJobContinueUponParentJobFailure() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+
+ String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+ String lastJob =
+ String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
+ TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED);
+
+ _driver.delete(queueName);
}
@Test
@@ -193,5 +226,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+
+ _driver.delete(queueName);
}
}