You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/14 18:08:18 UTC
incubator-gobblin git commit: [GOBBLIN-655] Allow helix job to have a
job type.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master b11cfa8b7 -> 69c65f842
[GOBBLIN-655] Allow helix job to have a job type.
Closes #2524 from kyuamazon/jobtype
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/69c65f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/69c65f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/69c65f84
Branch: refs/heads/master
Commit: 69c65f842504f2ffadeb19515c9c4810b8bf558c
Parents: b11cfa8
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Dec 14 10:08:13 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Dec 14 10:08:13 2018 -0800
----------------------------------------------------------------------
.../cluster/GobblinClusterConfigurationKeys.java | 6 +++++-
.../GobblinHelixDistributeJobExecutionLauncher.java | 7 +++++++
.../gobblin/cluster/GobblinHelixJobLauncher.java | 14 ++++++++++++--
3 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b2bd682..6791d52 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -75,11 +75,15 @@ public class GobblinClusterConfigurationKeys {
public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
- // Helix related tagging
+ // Helix tagging
public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
public static final String HELIX_PLANNING_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
+ // Helix job quota
+ public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobType";
+ public static final String HELIX_PLANNING_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobType";
+
// Planning job properties
public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob";
public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning.";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index bc8443d..9424ca8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -223,6 +223,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
jobConfigBuilder.setInstanceGroupTag(jobPlanningTag);
}
+ // Planning job should have its own type support
+ if (jobProps.containsKey(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY)) {
+ String jobType = jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY);
+ log.info("PlanningJob {} has types associated : {}", planningId, jobType);
+ jobConfigBuilder.setJobType(jobType);
+ }
+
jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 3389f84..732c7d3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -321,6 +321,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfigBuilder.setInstanceGroupTag(jobTag);
}
+ if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY)) {
+ String jobType = this.jobConfig.getString(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY));
+ log.info("Job {} has types associated : {}", this.jobContext.getJobId(), jobType);
+ jobConfigBuilder.setJobType(jobType);
+ }
+
if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
jobConfigBuilder.setRebalanceRunningTask(true);
}
@@ -335,8 +341,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
* Submit a job to run.
*/
private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
- HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(),
- this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds);
+ HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
+ this.helixWorkFlowName,
+ this.jobContext.getJobId(),
+ this.helixTaskDriver,
+ this.helixManager,
+ this.workFlowExpiryTimeSeconds);
}
public void launchJob(@Nullable JobListener jobListener)