You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/04 01:49:46 UTC

[5/7] helix git commit: Add timeout in JobConfig

Add timeout in JobConfig

To support job-level timeout for the task framework, add the configuration field. Associated changed is made in builder and JobBean.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/177d5bdc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/177d5bdc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/177d5bdc

Branch: refs/heads/master
Commit: 177d5bdc29fc2011e12ca82d7bdf5456ef31a956
Parents: 408082a
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 15:18:32 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:18:32 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 30 ++++++++++++++++++--
 .../java/org/apache/helix/task/TaskState.java   |  7 ++++-
 .../org/apache/helix/task/beans/JobBean.java    |  1 +
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 12aa058..6c3aed4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -80,6 +80,10 @@ public class JobConfig extends ResourceConfig {
      */
     JobCommandConfig,
     /**
+     * The allowed execution time of the job.
+     */
+    Timeout,
+    /**
      * The timeout for a task.
      */
     TimeoutPerPartition,
@@ -151,6 +155,7 @@ public class JobConfig extends ResourceConfig {
   }
 
   //Default property values
+  public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -171,7 +176,7 @@ public class JobConfig extends ResourceConfig {
   public JobConfig(String jobId, JobConfig jobConfig) {
     this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(),
         jobConfig.getTargetPartitionStates(), jobConfig.getCommand(),
-        jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(),
+        jobConfig.getJobCommandConfigMap(), jobConfig.getTimeout(), jobConfig.getTimeoutPerTask(),
         jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(),
         jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(),
         jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(),
@@ -183,7 +188,7 @@ public class JobConfig extends ResourceConfig {
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
-      long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+      long timeout, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
@@ -221,6 +226,7 @@ public class JobConfig extends ResourceConfig {
     if (executionStart > 0) {
       getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart);
     }
+    getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout);
     getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask);
     getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask);
     getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
@@ -289,6 +295,10 @@ public class JobConfig extends ResourceConfig {
         : null;
   }
 
+  public long getTimeout() {
+    return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT);
+  }
+
   public long getTimeoutPerTask() {
     return getRecord()
         .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK);
@@ -389,6 +399,7 @@ public class JobConfig extends ResourceConfig {
     private String _command;
     private Map<String, String> _commandConfig;
     private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+    private long _timeout = DEFAULT_TIMEOUT;
     private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
@@ -417,7 +428,7 @@ public class JobConfig extends ResourceConfig {
       validate();
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
-          _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+          _command, _commandConfig, _timeout, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
           _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
@@ -456,6 +467,9 @@ public class JobConfig extends ResourceConfig {
             cfg.get(JobConfigProperty.JobCommandConfig.name()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
+      if (cfg.containsKey(JobConfigProperty.Timeout.name())) {
+        b.setTimeout(Long.parseLong(cfg.get(JobConfigProperty.Timeout.name())));
+      }
       if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) {
         b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name())));
       }
@@ -544,6 +558,11 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setTimeout(long v) {
+      _timeout = v;
+      return this;
+    }
+
     public Builder setTimeoutPerTask(long v) {
       _timeoutPerTask = v;
       return this;
@@ -660,6 +679,10 @@ public class JobConfig extends ResourceConfig {
           }
         }
       }
+      if (_timeout < 0) {
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
+      }
       if (_timeoutPerTask < 0) {
         throw new IllegalArgumentException(String
             .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition,
@@ -696,6 +719,7 @@ public class JobConfig extends ResourceConfig {
 
       b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
           .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
+          .setTimeout(jobBean.timeout)
           .setTimeoutPerTask(jobBean.timeoutPerPartition)
           .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
           .setDisableExternalView(jobBean.disableExternalView)

http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 4e12f2d..3713c40 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -50,5 +50,10 @@ public enum TaskState {
   /**
    * The task are aborted due to workflow fail
    */
-  ABORTED
+  ABORTED,
+  /**
+   * The allowed execution time for the job.
+   * TODO: also use this for the task
+   */
+  TIMED_OUT
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 7b42ad2..8d2f259 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -38,6 +38,7 @@ public class JobBean {
   public String command;
   public Map<String, String> jobCommandConfigMap;
   public List<TaskBean> tasks;
+  public long timeout = JobConfig.DEFAULT_TIMEOUT;
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;