You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/12 20:14:26 UTC
incubator-gobblin git commit: [GOBBLIN-584] Improve the Helix
configuration naming.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 648c2a2ab -> a6ec4a9af
[GOBBLIN-584] Improve the Helix configuration naming.
Closes #2450 from yukuai518/distmetrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6ec4a9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6ec4a9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6ec4a9a
Branch: refs/heads/master
Commit: a6ec4a9af528039b69f6c7f33f968723214c557b
Parents: 648c2a2
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Sep 12 13:14:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Sep 12 13:14:21 2018 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 9 ++----
.../GobblinClusterConfigurationKeys.java | 7 +++++
...blinHelixDistributeJobExecutionLauncher.java | 29 ++++++++++++++------
.../cluster/GobblinHelixJobLauncher.java | 24 +++++++++++-----
.../org/apache/gobblin/cluster/HelixUtils.java | 3 +-
.../apache/gobblin/util/PropertiesUtils.java | 4 +++
6 files changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 47233e8..35a3a45 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -112,13 +112,8 @@ public class ConfigurationKeys {
public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = "scheduler.wait.for.job.completion";
public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = Boolean.TRUE.toString();
- public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "job.timeout.enabled";
- public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
- public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds";
- public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
-
- public static final String HELIX_TASK_TIMEOUT_SECONDS = "task.timeout.seconds";
- public static final long DEFAULT_HELIX_TASK_TIMEOUT_SECONDS = 60 * 60;
+ public static final String TASK_TIMEOUT_SECONDS = "task.timeout.seconds";
+ public static final long DEFAULT_TASK_TIMEOUT_SECONDS = 60 * 60;
/**
* Task executor and state tracker configuration properties.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b6c11e3..2b1ba09 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -118,4 +118,11 @@ public class GobblinClusterConfigurationKeys {
public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60;
public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
+
+ public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "helix.job.timeout.enabled";
+ public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
+ public static final String HELIX_JOB_TIMEOUT_SECONDS = "helix.job.timeout.seconds";
+ public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
+ public static final String HELIX_TASK_TIMEOUT_SECONDS = "helix.task.timeout.seconds";
+ public static final String HELIX_MAX_TASK_RETRIES_KEY = "helix.task.maxretries";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index b5f8928..eb78938 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -184,6 +184,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
/**
* Create a job config builder which has a single task that wraps the original jobProps.
+ *
+ * The planning job (which runs the original {@link GobblinHelixJobLauncher}) will be
+ * executed on one of the Helix participants.
+ *
+ * We rely on the underlying {@link GobblinHelixJobLauncher} to correctly handle the task
+ * execution timeout so that the planning job itself is relieved of the timeout constrain.
+ *
+ * In short, the planning job will run once and requires no timeout.
*/
private JobConfig.Builder createPlanningJob (Properties jobProps) {
// Create a single task for job planning
@@ -199,10 +207,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
taskConfigMap.put(planningId, TaskConfig.Builder.from(rawConfigMap));
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
- jobConfigBuilder.setTimeoutPerTask(PropertiesUtils.getPropAsLong(
- jobProps,
- ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
- ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
+ // We want GobblinHelixJobLauncher only run once.
+ jobConfigBuilder.setMaxAttemptsPerTask(1);
+
+ // Planning job never timeout (Helix defaults 1h timeout, set a large number '1 month')
+ jobConfigBuilder.setTimeoutPerTask(JobConfig.DEFAULT_TIMEOUT_PER_TASK * 24 * 30);
+
+ jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps,
+ GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
+ GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
jobConfigBuilder.setFailureThreshold(1);
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME);
@@ -252,10 +265,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
}
private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException {
- boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
- ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
- long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
- ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+ long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
try {
HelixUtils.waitJobCompletion(
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6672923..8523e21 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -269,12 +269,20 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
}
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+
+ // Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility)
jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt(
- ConfigurationKeys.MAX_TASK_RETRIES_KEY, ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES));
+ GobblinClusterConfigurationKeys.HELIX_MAX_TASK_RETRIES_KEY,
+ this.jobContext.getJobState().getPropAsInt(
+ ConfigurationKeys.MAX_TASK_RETRIES_KEY,
+ ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
+ // Helix task timeout (fallback to general task timeout for backward compatibility)
jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong(
- ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
- ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
+ GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
+ this.jobContext.getJobState().getPropAsLong(
+ ConfigurationKeys.TASK_TIMEOUT_SECONDS,
+ ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
jobConfigBuilder.setFailureThreshold(workUnits.size());
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME);
@@ -377,10 +385,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
}
private void waitForJobCompletion() throws InterruptedException {
- boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
- ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
- long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
- ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+ long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
try {
HelixUtils.waitJobCompletion(
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 452e421..6fbc7c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -176,12 +176,11 @@ public class HelixUtils {
static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher,
JobListener jobListener) throws InterruptedException {
try {
+ log.warn("Timeout occurred for job launcher {} with job {}", jobLauncher.getClass(), jobName);
if (jobLauncher instanceof GobblinHelixJobLauncher) {
((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener);
} else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) {
((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel();
- } else {
- log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass());
}
} catch (JobException e) {
throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index c41273c..51f2019 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -61,6 +61,10 @@ public class PropertiesUtils {
return Boolean.valueOf(properties.getProperty(key, defaultValue));
}
+ public static int getPropAsInt(Properties properties, String key, int defaultValue) {
+ return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue)));
+ }
+
public static long getPropAsLong(Properties properties, String key, long defaultValue) {
return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
}