You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/12 20:14:26 UTC

incubator-gobblin git commit: [GOBBLIN-584] Improve the Helix configuration naming.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 648c2a2ab -> a6ec4a9af


[GOBBLIN-584] Improve the Helix configuration naming.

Closes #2450 from yukuai518/distmetrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6ec4a9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6ec4a9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6ec4a9a

Branch: refs/heads/master
Commit: a6ec4a9af528039b69f6c7f33f968723214c557b
Parents: 648c2a2
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Sep 12 13:14:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Sep 12 13:14:21 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  9 ++----
 .../GobblinClusterConfigurationKeys.java        |  7 +++++
 ...blinHelixDistributeJobExecutionLauncher.java | 29 ++++++++++++++------
 .../cluster/GobblinHelixJobLauncher.java        | 24 +++++++++++-----
 .../org/apache/gobblin/cluster/HelixUtils.java  |  3 +-
 .../apache/gobblin/util/PropertiesUtils.java    |  4 +++
 6 files changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 47233e8..35a3a45 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -112,13 +112,8 @@ public class ConfigurationKeys {
   public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = "scheduler.wait.for.job.completion";
   public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = Boolean.TRUE.toString();
 
-  public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "job.timeout.enabled";
-  public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
-  public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds";
-  public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
-
-  public static final String HELIX_TASK_TIMEOUT_SECONDS = "task.timeout.seconds";
-  public static final long DEFAULT_HELIX_TASK_TIMEOUT_SECONDS = 60 * 60;
+  public static final String TASK_TIMEOUT_SECONDS = "task.timeout.seconds";
+  public static final long DEFAULT_TASK_TIMEOUT_SECONDS = 60 * 60;
 
   /**
    * Task executor and state tracker configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b6c11e3..2b1ba09 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -118,4 +118,11 @@ public class GobblinClusterConfigurationKeys {
   public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60;
 
   public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
+
+  public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "helix.job.timeout.enabled";
+  public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
+  public static final String HELIX_JOB_TIMEOUT_SECONDS = "helix.job.timeout.seconds";
+  public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
+  public static final String HELIX_TASK_TIMEOUT_SECONDS = "helix.task.timeout.seconds";
+  public static final String HELIX_MAX_TASK_RETRIES_KEY = "helix.task.maxretries";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index b5f8928..eb78938 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -184,6 +184,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
 
   /**
    * Create a job config builder which has a single task that wraps the original jobProps.
+   *
+   * The planning job (which runs the original {@link GobblinHelixJobLauncher}) will be
+   * executed on one of the Helix participants.
+   *
+   * We rely on the underlying {@link GobblinHelixJobLauncher} to correctly handle the task
+   * execution timeout so that the planning job itself is relieved of the timeout constrain.
+   *
+   * In short, the planning job will run once and requires no timeout.
    */
   private JobConfig.Builder createPlanningJob (Properties jobProps) {
     // Create a single task for job planning
@@ -199,10 +207,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     taskConfigMap.put(planningId, TaskConfig.Builder.from(rawConfigMap));
     JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
 
-    jobConfigBuilder.setTimeoutPerTask(PropertiesUtils.getPropAsLong(
-        jobProps,
-        ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
-        ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
+    // We want GobblinHelixJobLauncher only run once.
+    jobConfigBuilder.setMaxAttemptsPerTask(1);
+
+    // Planning job never timeout (Helix defaults 1h timeout, set a large number '1 month')
+    jobConfigBuilder.setTimeoutPerTask(JobConfig.DEFAULT_TIMEOUT_PER_TASK * 24 * 30);
+
+    jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps,
+        GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
+        GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
 
     jobConfigBuilder.setFailureThreshold(1);
     jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME);
@@ -252,10 +265,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
   }
 
   private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException {
-    boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
-        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
-    long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
-        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+    boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+    long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
     try {
       HelixUtils.waitJobCompletion(

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6672923..8523e21 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -269,12 +269,20 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     }
 
     JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+
+    // Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility)
     jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt(
-        ConfigurationKeys.MAX_TASK_RETRIES_KEY, ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES));
+        GobblinClusterConfigurationKeys.HELIX_MAX_TASK_RETRIES_KEY,
+        this.jobContext.getJobState().getPropAsInt(
+            ConfigurationKeys.MAX_TASK_RETRIES_KEY,
+            ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
 
+    // Helix task timeout (fallback to general task timeout for backward compatibility)
     jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong(
-        ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
-        ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
+        GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
+        this.jobContext.getJobState().getPropAsLong(
+            ConfigurationKeys.TASK_TIMEOUT_SECONDS,
+            ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
 
     jobConfigBuilder.setFailureThreshold(workUnits.size());
     jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME);
@@ -377,10 +385,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   }
 
   private void waitForJobCompletion()  throws InterruptedException {
-    boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
-        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
-    long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
-        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+    boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+    long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
     try {
       HelixUtils.waitJobCompletion(

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 452e421..6fbc7c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -176,12 +176,11 @@ public class HelixUtils {
   static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher,
       JobListener jobListener) throws InterruptedException {
     try {
+      log.warn("Timeout occurred for job launcher {} with job {}", jobLauncher.getClass(), jobName);
       if (jobLauncher instanceof GobblinHelixJobLauncher) {
         ((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener);
       } else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) {
         ((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel();
-      } else {
-        log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass());
       }
     } catch (JobException e) {
       throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index c41273c..51f2019 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -61,6 +61,10 @@ public class PropertiesUtils {
     return Boolean.valueOf(properties.getProperty(key, defaultValue));
   }
 
+  public static int getPropAsInt(Properties properties, String key, int defaultValue) {
+    return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue)));
+  }
+
   public static long getPropAsLong(Properties properties, String key, long defaultValue) {
     return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
   }