You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/06/01 18:31:40 UTC
[gobblin] branch master updated: [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edcdc8d4f [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
edcdc8d4f is described below
commit edcdc8d4f764369d47bf5fd29e69b1a58c5a0f90
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Jun 1 11:31:32 2022 -0700
[GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
* Shorten job name length if it exceeds 255 characters (max size for a directory component)
* Address review to account for flow name in hash
---
.../service/modules/spec/JobExecutionPlan.java | 10 ++++++++--
.../spec/JobExecutionPlanDagFactoryTest.java | 21 +++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 79374af33..e04e7cb90 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", "jobFuture", "flowStartTime"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+ private static final int MAX_JOB_NAME_LENGTH = 255;
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
@@ -108,8 +109,13 @@ public class JobExecutionPlan {
// Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
- jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, flowInputPath.hashCode());
-
+ int hash = flowInputPath.hashCode();
+ jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, hash);
+ // jobNames are commonly used as a directory name, which is limited to 255 characters
+ if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
+ // shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length
+ jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName.hashCode(), hash);
+ }
JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index cba53dc67..0958995a2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -204,4 +204,25 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertFalse(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
}
+ @Test
+ public void testCreateDagLongName() throws Exception {
+ // flowName and flowGroup are both 128 characters long, the maximum for flowName and flowGroup
+ Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+ Config jobConfig = ConfigBuilder.create()
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+ FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+ Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+
+ Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
+
+ }
+
}
\ No newline at end of file