You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/12/27 20:25:52 UTC

(beam) branch master updated: Fix ItFramework extractJobName (#29876)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dda2ffae602 Fix ItFramework extractJobName (#29876)
dda2ffae602 is described below

commit dda2ffae602d22704e8b2b42d4d629f5c354fac4
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed Dec 27 15:25:45 2023 -0500

    Fix ItFramework extractJobName (#29876)
    
    Fix extractJobName not filter suffix after createJobName with randomChars introduced
---
 .../java/org/apache/beam/it/common/utils/PipelineUtils.java    | 10 +++++++++-
 .../org/apache/beam/it/common/utils/PipelineUtilsTest.java     |  6 ++++++
 .../apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java   |  4 ++--
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
index d249d43d378..e53de5bdb67 100644
--- a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
+++ b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
@@ -22,6 +22,8 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -115,7 +117,13 @@ public class PipelineUtils {
 
   /** Get raw job name (without prefix) from a jobName generated by createJobName. */
   public static String extractJobName(String nameWithPrefix) {
-    return nameWithPrefix.substring(0, nameWithPrefix.lastIndexOf("-"));
+    Pattern pattern = Pattern.compile("-\\d{17}"); // yyyyMMddHHmmssSSS
+    Matcher matcher = pattern.matcher(nameWithPrefix);
+    if (matcher.find()) {
+      return nameWithPrefix.substring(0, matcher.start());
+    } else {
+      throw new IllegalArgumentException("Unexpected job name: " + nameWithPrefix);
+    }
   }
 
   /*
diff --git a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
index 316283cdf7d..d2b8334964f 100644
--- a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
+++ b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
@@ -51,4 +51,10 @@ public class PipelineUtilsTest {
     String name = "create-job-name";
     assertEquals(name, extractJobName(createJobName(name)));
   }
+
+  @Test
+  public void testCreateExtractJobNameWithRandomChars() {
+    String name = "create-job-name";
+    assertEquals(name, extractJobName(createJobName(name, 8)));
+  }
 }
diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 620d24d4e11..e0f9f6c2f63 100644
--- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -410,9 +410,9 @@ public class DefaultPipelineLauncher extends AbstractPipelineLauncher {
     // config pipelineName
     String pipelineName = PipelineUtils.extractJobName(options.jobName());
     String overrideName = null;
-    if (pipelineName.endsWith("write")) {
+    if (pipelineName.startsWith("write")) {
       overrideName = System.getProperty(WRITE_PIPELINE_NAME_OVERWRITE);
-    } else if (pipelineName.endsWith("read")) {
+    } else if (pipelineName.startsWith("read")) {
       overrideName = System.getProperty(READ_PIPELINE_NAME_OVERWRITE);
     }
     if (!Strings.isNullOrEmpty(overrideName)) {