You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/04 01:49:46 UTC
[5/7] helix git commit: Add timeout in JobConfig
Add timeout in JobConfig
To support job-level timeout for the task framework, add the configuration field. Associated changed is made in builder and JobBean.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/177d5bdc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/177d5bdc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/177d5bdc
Branch: refs/heads/master
Commit: 177d5bdc29fc2011e12ca82d7bdf5456ef31a956
Parents: 408082a
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 15:18:32 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:18:32 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 30 ++++++++++++++++++--
.../java/org/apache/helix/task/TaskState.java | 7 ++++-
.../org/apache/helix/task/beans/JobBean.java | 1 +
3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 12aa058..6c3aed4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -80,6 +80,10 @@ public class JobConfig extends ResourceConfig {
*/
JobCommandConfig,
/**
+ * The allowed execution time of the job.
+ */
+ Timeout,
+ /**
* The timeout for a task.
*/
TimeoutPerPartition,
@@ -151,6 +155,7 @@ public class JobConfig extends ResourceConfig {
}
//Default property values
+ public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -171,7 +176,7 @@ public class JobConfig extends ResourceConfig {
public JobConfig(String jobId, JobConfig jobConfig) {
this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(),
jobConfig.getTargetPartitionStates(), jobConfig.getCommand(),
- jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(),
+ jobConfig.getJobCommandConfigMap(), jobConfig.getTimeout(), jobConfig.getTimeoutPerTask(),
jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(),
jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(),
jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(),
@@ -183,7 +188,7 @@ public class JobConfig extends ResourceConfig {
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
- long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+ long timeout, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
boolean disableExternalView, boolean ignoreDependentJobFailure,
Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
@@ -221,6 +226,7 @@ public class JobConfig extends ResourceConfig {
if (executionStart > 0) {
getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart);
}
+ getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout);
getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask);
getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask);
getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
@@ -289,6 +295,10 @@ public class JobConfig extends ResourceConfig {
: null;
}
+ public long getTimeout() {
+ return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT);
+ }
+
public long getTimeoutPerTask() {
return getRecord()
.getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK);
@@ -389,6 +399,7 @@ public class JobConfig extends ResourceConfig {
private String _command;
private Map<String, String> _commandConfig;
private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+ private long _timeout = DEFAULT_TIMEOUT;
private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
@@ -417,7 +428,7 @@ public class JobConfig extends ResourceConfig {
validate();
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
- _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+ _command, _commandConfig, _timeout, _timeoutPerTask, _numConcurrentTasksPerInstance,
_maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
_disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
_instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
@@ -456,6 +467,9 @@ public class JobConfig extends ResourceConfig {
cfg.get(JobConfigProperty.JobCommandConfig.name()));
b.setJobCommandConfigMap(commandConfigMap);
}
+ if (cfg.containsKey(JobConfigProperty.Timeout.name())) {
+ b.setTimeout(Long.parseLong(cfg.get(JobConfigProperty.Timeout.name())));
+ }
if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) {
b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name())));
}
@@ -544,6 +558,11 @@ public class JobConfig extends ResourceConfig {
return this;
}
+ public Builder setTimeout(long v) {
+ _timeout = v;
+ return this;
+ }
+
public Builder setTimeoutPerTask(long v) {
_timeoutPerTask = v;
return this;
@@ -660,6 +679,10 @@ public class JobConfig extends ResourceConfig {
}
}
}
+ if (_timeout < 0) {
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
+ }
if (_timeoutPerTask < 0) {
throw new IllegalArgumentException(String
.format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition,
@@ -696,6 +719,7 @@ public class JobConfig extends ResourceConfig {
b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
.setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
+ .setTimeout(jobBean.timeout)
.setTimeoutPerTask(jobBean.timeoutPerPartition)
.setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
.setDisableExternalView(jobBean.disableExternalView)
http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 4e12f2d..3713c40 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -50,5 +50,10 @@ public enum TaskState {
/**
* The task are aborted due to workflow fail
*/
- ABORTED
+ ABORTED,
+ /**
+ * The allowed execution time for the job.
+ * TODO: also use this for the task
+ */
+ TIMED_OUT
}
http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 7b42ad2..8d2f259 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -38,6 +38,7 @@ public class JobBean {
public String command;
public Map<String, String> jobCommandConfigMap;
public List<TaskBean> tasks;
+ public long timeout = JobConfig.DEFAULT_TIMEOUT;
public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;