You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/14 18:08:18 UTC

incubator-gobblin git commit: [GOBBLIN-655] Allow helix job to have a job type.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b11cfa8b7 -> 69c65f842


[GOBBLIN-655] Allow helix job to have a job type.

Closes #2524 from kyuamazon/jobtype


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/69c65f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/69c65f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/69c65f84

Branch: refs/heads/master
Commit: 69c65f842504f2ffadeb19515c9c4810b8bf558c
Parents: b11cfa8
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Dec 14 10:08:13 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Dec 14 10:08:13 2018 -0800

----------------------------------------------------------------------
 .../cluster/GobblinClusterConfigurationKeys.java      |  6 +++++-
 .../GobblinHelixDistributeJobExecutionLauncher.java   |  7 +++++++
 .../gobblin/cluster/GobblinHelixJobLauncher.java      | 14 ++++++++++++--
 3 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b2bd682..6791d52 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -75,11 +75,15 @@ public class GobblinClusterConfigurationKeys {
   public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
   public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
 
-  // Helix related tagging
+  // Helix tagging
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
   public static final String HELIX_PLANNING_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
 
+  // Helix job quota
+  public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobType";
+  public static final String HELIX_PLANNING_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobType";
+
   // Planning job properties
   public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob";
   public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning.";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index bc8443d..9424ca8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -223,6 +223,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
       jobConfigBuilder.setInstanceGroupTag(jobPlanningTag);
     }
 
+    // Planning job should have its own type support
+    if (jobProps.containsKey(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY)) {
+      String jobType = jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY);
+      log.info("PlanningJob {} has types associated : {}", planningId, jobType);
+      jobConfigBuilder.setJobType(jobType);
+    }
+
     jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 3389f84..732c7d3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -321,6 +321,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
       jobConfigBuilder.setInstanceGroupTag(jobTag);
     }
 
+    if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY)) {
+      String jobType = this.jobConfig.getString(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY));
+      log.info("Job {} has types associated : {}", this.jobContext.getJobId(), jobType);
+      jobConfigBuilder.setJobType(jobType);
+    }
+
     if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }
@@ -335,8 +341,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
-    HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(),
-        this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds);
+    HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
+        this.helixWorkFlowName,
+        this.jobContext.getJobId(),
+        this.helixTaskDriver,
+        this.helixManager,
+        this.workFlowExpiryTimeSeconds);
   }
 
   public void launchJob(@Nullable JobListener jobListener)