You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/06/01 18:31:40 UTC

[gobblin] branch master updated: [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new edcdc8d4f [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
edcdc8d4f is described below

commit edcdc8d4f764369d47bf5fd29e69b1a58c5a0f90
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Jun 1 11:31:32 2022 -0700

    [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
    
    * Shorten job name length if it exceeds 255 characters (max size for a directory component)
    
    * Address review to account for flow name in hash
---
 .../service/modules/spec/JobExecutionPlan.java      | 10 ++++++++--
 .../spec/JobExecutionPlanDagFactoryTest.java        | 21 +++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 79374af33..e04e7cb90 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
 @EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", "jobFuture", "flowStartTime"})
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+  private static final int MAX_JOB_NAME_LENGTH = 255;
 
   private final JobSpec jobSpec;
   private final SpecExecutor specExecutor;
@@ -108,8 +109,13 @@ public class JobExecutionPlan {
 
       // Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
       // job names are assumed to be unique within a dag.
-      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, flowInputPath.hashCode());
-
+      int hash = flowInputPath.hashCode();
+      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, hash);
+      // jobNames are commonly used as a directory name, which is limited to 255 characters
+      if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
+        // shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length
+        jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName.hashCode(), hash);
+      }
       JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
           .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index cba53dc67..0958995a2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -204,4 +204,25 @@ public class JobExecutionPlanDagFactoryTest {
     Assert.assertFalse(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
   }
 
+  @Test
+  public void testCreateDagLongName() throws Exception {
+    // flowName and flowGroup are both 128 characters long, the maximum for flowName and flowGroup
+    Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+    Config jobConfig = ConfigBuilder.create()
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+    FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+    JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+        ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+    Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+
+    Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
+
+  }
+
 }
\ No newline at end of file