You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/27 17:07:58 UTC

[beam] branch release-2.22.0 updated: Merge pull request #11813: [BEAM-10077] using filename + hash instead of UUID for staging name (#11830)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.22.0 by this push:
     new b9461a5  Merge pull request #11813: [BEAM-10077] using filename + hash instead of UUID for staging name (#11830)
b9461a5 is described below

commit b9461a5eafe9224ccce3d09f9355916ef48d6ed2
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed May 27 10:07:36 2020 -0700

    Merge pull request #11813: [BEAM-10077] using filename + hash instead of UUID for staging name (#11830)
    
    Co-authored-by: Chamikara Jayalath <ch...@apache.org>
---
 .../runners/core/construction/Environments.java    | 23 ++++++++++++--------
 .../beam/runners/dataflow/DataflowRunner.java      | 12 ++++-------
 .../beam/runners/dataflow/util/PackageUtil.java    | 25 +++++++---------------
 .../beam/runners/dataflow/util/GCSUploadMain.java  |  7 +++++-
 .../runners/dataflow/util/PackageUtilTest.java     | 24 ++++++++++++---------
 5 files changed, 46 insertions(+), 45 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index c324b92..72ebd13 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import com.fasterxml.jackson.core.Base64Variants;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -215,25 +216,19 @@ public class Environments {
     Set<String> deduplicatedStagingFiles = new LinkedHashSet<>(stagingFiles);
     for (String path : deduplicatedStagingFiles) {
       File file;
-      String stagedName;
+      String stagedName = null;
       if (path.contains("=")) {
         String[] components = path.split("=", 2);
         file = new File(components[1]);
         stagedName = components[0];
       } else {
         file = new File(path);
-        stagedName = createStagingFileName(file);
       }
       // Spurious items get added to the classpath. Filter by just those that exist.
       if (file.exists()) {
         ArtifactInformation.Builder artifactBuilder = ArtifactInformation.newBuilder();
         artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
         artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
-        artifactBuilder.setRolePayload(
-            RunnerApi.ArtifactStagingToRolePayload.newBuilder()
-                .setStagedName(stagedName)
-                .build()
-                .toByteString());
         HashCode hashCode;
         if (file.isDirectory()) {
           File zippedFile;
@@ -264,6 +259,14 @@ public class Environments {
                   .build()
                   .toByteString());
         }
+        if (stagedName == null) {
+          stagedName = createStagingFileName(file, hashCode);
+        }
+        artifactBuilder.setRolePayload(
+            RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                .setStagedName(stagedName)
+                .build()
+                .toByteString());
         artifactsBuilder.add(artifactBuilder.build());
       }
     }
@@ -314,10 +317,12 @@ public class Environments {
     return capabilities.build();
   }
 
-  public static String createStagingFileName(File path) {
+  public static String createStagingFileName(File path, HashCode hash) {
+    String encodedHash = Base64Variants.MODIFIED_FOR_URL.encode(hash.asBytes());
+    String fileName = Files.getNameWithoutExtension(path.getAbsolutePath());
     String ext = path.isDirectory() ? "jar" : Files.getFileExtension(path.getAbsolutePath());
     String suffix = Strings.isNullOrEmpty(ext) ? "" : "." + ext;
-    return UUID.randomUUID().toString() + suffix;
+    return String.format("%s-%s%s", fileName, encodedHash, suffix);
   }
 
   private static File zipDirectory(File directory) throws IOException {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a4ea1c9..e030bba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -55,7 +55,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -210,8 +209,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   @VisibleForTesting static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024;
 
-  @VisibleForTesting static final String PIPELINE_FILE_FORMAT = "pipeline-%s.pb";
-  @VisibleForTesting static final String DATAFLOW_GRAPH_FILE_FORMAT = "dataflow_graph-%s.json";
+  @VisibleForTesting static final String PIPELINE_FILE_NAME = "pipeline.pb";
+  @VisibleForTesting static final String DATAFLOW_GRAPH_FILE_NAME = "dataflow_graph.json";
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
@@ -892,10 +891,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     LOG.info("Staging pipeline description to {}", options.getStagingLocation());
     byte[] serializedProtoPipeline = jobSpecification.getPipelineProto().toByteArray();
     DataflowPackage stagedPipeline =
-        options
-            .getStager()
-            .stageToFile(
-                serializedProtoPipeline, String.format(PIPELINE_FILE_FORMAT, UUID.randomUUID()));
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
     dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
 
     if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar())) {
@@ -995,7 +991,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               .getStager()
               .stageToFile(
                   DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
-                  String.format(DATAFLOW_GRAPH_FILE_FORMAT, UUID.randomUUID()));
+                  DATAFLOW_GRAPH_FILE_NAME);
       newJob.getSteps().clear();
       newJob.setStepsLocation(stagedGraph.getLocation());
     }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 0b5e478..7213993 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,7 +34,6 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -52,7 +51,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.MoreFutures;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteSource;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
@@ -360,21 +359,15 @@ public class PackageUtil implements Closeable {
 
   @AutoValue
   public abstract static class StagedFile {
-    public static PackageUtil.StagedFile of(
-        String source, String sha256, @Nullable String destination) {
+    public static PackageUtil.StagedFile of(String source, String sha256, String destination) {
       return new AutoValue_PackageUtil_StagedFile(source, sha256, destination);
     }
 
-    public static PackageUtil.StagedFile of(String source, String sha256) {
-      return new AutoValue_PackageUtil_StagedFile(source, sha256, null);
-    }
-
     /** The file to stage. */
     public abstract String getSource();
     /** The SHA-256 hash of the source file. */
     public abstract String getSha256();
     /** Staged target for this file. */
-    @Nullable
     public abstract String getDestination();
   }
 
@@ -405,25 +398,22 @@ public class PackageUtil implements Closeable {
       }
       checkState(!file.isDirectory(), "Source file must not be a directory.");
       DataflowPackage destination = new DataflowPackage();
-      String target = dest == null ? Environments.createStagingFileName(file) : dest;
       String resourcePath =
           FileSystems.matchNewResource(stagingPath, true)
-              .resolve(target, StandardResolveOptions.RESOLVE_FILE)
+              .resolve(dest, StandardResolveOptions.RESOLVE_FILE)
               .toString();
       destination.setLocation(resourcePath);
-      destination.setName(target);
+      destination.setName(dest);
       return new AutoValue_PackageUtil_PackageAttributes(
           file, null, destination, file.length(), hash);
     }
 
     public static PackageAttributes forBytesToStage(
         byte[] bytes, String targetName, String stagingPath) {
-
-      Hasher hasher = Hashing.sha256().newHasher();
-      String hash = hasher.putBytes(bytes).hash().toString();
+      HashCode hashCode = Hashing.sha256().newHasher().putBytes(bytes).hash();
       long size = bytes.length;
 
-      String target = targetName == null ? UUID.randomUUID().toString() : targetName;
+      String target = Environments.createStagingFileName(new File(targetName), hashCode);
 
       String resourcePath =
           FileSystems.matchNewResource(stagingPath, true)
@@ -433,7 +423,8 @@ public class PackageUtil implements Closeable {
       targetPackage.setName(target);
       targetPackage.setLocation(resourcePath);
 
-      return new AutoValue_PackageUtil_PackageAttributes(null, bytes, targetPackage, size, hash);
+      return new AutoValue_PackageUtil_PackageAttributes(
+          null, bytes, targetPackage, size, hashCode.toString());
     }
 
     public PackageAttributes withPackageName(String overridePackageName) {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
index fb07652..468ec95 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
@@ -21,9 +21,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
 
@@ -39,9 +41,12 @@ public class GCSUploadMain {
             .map(
                 (String source) -> {
                   try {
+                    File file = new File(source);
+                    HashCode hashCode = Files.asByteSource(file).hash(Hashing.sha256());
                     return PackageUtil.StagedFile.of(
                         source,
-                        Files.asByteSource(new File(source)).hash(Hashing.sha256()).toString());
+                        hashCode.toString(),
+                        Environments.createStagingFileName(file, hashCode));
                   } catch (IOException e) {
                     throw new UncheckedIOException(e);
                   }
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 2847279..535fbce 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -66,6 +66,7 @@ import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
 import org.apache.beam.runners.dataflow.util.PackageUtil.StagedFile;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
@@ -85,6 +86,7 @@ import org.apache.beam.sdk.util.ZipFiles;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.LineReader;
@@ -140,12 +142,12 @@ public class PackageUtilTest {
 
   private static PackageAttributes makePackageAttributes(
       File file, @Nullable String overridePackageName) throws IOException {
-    File sourceFile = file.isDirectory() ? zipDirectory(file) : file;
+    StagedFile stagedFile = makeStagedFile(file.getPath());
     PackageAttributes attributes =
         PackageUtil.PackageAttributes.forFileToStage(
-            sourceFile.getPath(),
-            Files.asByteSource(sourceFile).hash(Hashing.sha256()).toString(),
-            null,
+            stagedFile.getSource(),
+            stagedFile.getSha256(),
+            stagedFile.getDestination(),
             STAGING_PATH);
     if (overridePackageName != null) {
       attributes = attributes.withPackageName(overridePackageName);
@@ -160,15 +162,17 @@ public class PackageUtilTest {
   private static StagedFile makeStagedFile(String source, String destName) throws IOException {
     File file = new File(source);
     File sourceFile;
-    String sha256;
+    HashCode hashCode;
     if (file.exists()) {
       sourceFile = file.isDirectory() ? zipDirectory(file) : file;
-      sha256 = Files.asByteSource(sourceFile).hash(Hashing.sha256()).toString();
+      hashCode = Files.asByteSource(sourceFile).hash(Hashing.sha256());
     } else {
       sourceFile = file;
-      sha256 = "";
+      hashCode = Hashing.sha256().hashBytes(new byte[] {});
     }
-    return StagedFile.of(sourceFile.getPath(), sha256, destName);
+    String destination =
+        destName == null ? Environments.createStagingFileName(file, hashCode) : destName;
+    return StagedFile.of(sourceFile.getPath(), hashCode.toString(), destination);
   }
 
   private static File zipDirectory(File directory) throws IOException {
@@ -218,8 +222,8 @@ public class PackageUtilTest {
     makeFileWithContents("folder2/folderA/sameName", "This is a test!");
     DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
 
-    assertNotEquals(target1.getName(), target2.getName());
-    assertNotEquals(target1.getLocation(), target2.getLocation());
+    assertEquals(target1.getName(), target2.getName());
+    assertEquals(target1.getLocation(), target2.getLocation());
   }
 
   @Test