You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/27 17:07:58 UTC
[beam] branch release-2.22.0 updated: Merge pull request #11813:
[BEAM-10077] using filename + hash instead of UUID for staging name
(#11830)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.22.0 by this push:
new b9461a5 Merge pull request #11813: [BEAM-10077] using filename + hash instead of UUID for staging name (#11830)
b9461a5 is described below
commit b9461a5eafe9224ccce3d09f9355916ef48d6ed2
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed May 27 10:07:36 2020 -0700
Merge pull request #11813: [BEAM-10077] using filename + hash instead of UUID for staging name (#11830)
Co-authored-by: Chamikara Jayalath <ch...@apache.org>
---
.../runners/core/construction/Environments.java | 23 ++++++++++++--------
.../beam/runners/dataflow/DataflowRunner.java | 12 ++++-------
.../beam/runners/dataflow/util/PackageUtil.java | 25 +++++++---------------
.../beam/runners/dataflow/util/GCSUploadMain.java | 7 +++++-
.../runners/dataflow/util/PackageUtilTest.java | 24 ++++++++++++---------
5 files changed, 46 insertions(+), 45 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index c324b92..72ebd13 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core.construction;
+import com.fasterxml.jackson.core.Base64Variants;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileOutputStream;
@@ -215,25 +216,19 @@ public class Environments {
Set<String> deduplicatedStagingFiles = new LinkedHashSet<>(stagingFiles);
for (String path : deduplicatedStagingFiles) {
File file;
- String stagedName;
+ String stagedName = null;
if (path.contains("=")) {
String[] components = path.split("=", 2);
file = new File(components[1]);
stagedName = components[0];
} else {
file = new File(path);
- stagedName = createStagingFileName(file);
}
// Spurious items get added to the classpath. Filter by just those that exist.
if (file.exists()) {
ArtifactInformation.Builder artifactBuilder = ArtifactInformation.newBuilder();
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
- artifactBuilder.setRolePayload(
- RunnerApi.ArtifactStagingToRolePayload.newBuilder()
- .setStagedName(stagedName)
- .build()
- .toByteString());
HashCode hashCode;
if (file.isDirectory()) {
File zippedFile;
@@ -264,6 +259,14 @@ public class Environments {
.build()
.toByteString());
}
+ if (stagedName == null) {
+ stagedName = createStagingFileName(file, hashCode);
+ }
+ artifactBuilder.setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName(stagedName)
+ .build()
+ .toByteString());
artifactsBuilder.add(artifactBuilder.build());
}
}
@@ -314,10 +317,12 @@ public class Environments {
return capabilities.build();
}
- public static String createStagingFileName(File path) {
+ public static String createStagingFileName(File path, HashCode hash) {
+ String encodedHash = Base64Variants.MODIFIED_FOR_URL.encode(hash.asBytes());
+ String fileName = Files.getNameWithoutExtension(path.getAbsolutePath());
String ext = path.isDirectory() ? "jar" : Files.getFileExtension(path.getAbsolutePath());
String suffix = Strings.isNullOrEmpty(ext) ? "" : "." + ext;
- return UUID.randomUUID().toString() + suffix;
+ return String.format("%s-%s%s", fileName, encodedHash, suffix);
}
private static File zipDirectory(File directory) throws IOException {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a4ea1c9..e030bba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -55,7 +55,6 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -210,8 +209,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024;
- @VisibleForTesting static final String PIPELINE_FILE_FORMAT = "pipeline-%s.pb";
- @VisibleForTesting static final String DATAFLOW_GRAPH_FILE_FORMAT = "dataflow_graph-%s.json";
+ @VisibleForTesting static final String PIPELINE_FILE_NAME = "pipeline.pb";
+ @VisibleForTesting static final String DATAFLOW_GRAPH_FILE_NAME = "dataflow_graph.json";
private static final ObjectMapper MAPPER = new ObjectMapper();
@@ -892,10 +891,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
LOG.info("Staging pipeline description to {}", options.getStagingLocation());
byte[] serializedProtoPipeline = jobSpecification.getPipelineProto().toByteArray();
DataflowPackage stagedPipeline =
- options
- .getStager()
- .stageToFile(
- serializedProtoPipeline, String.format(PIPELINE_FILE_FORMAT, UUID.randomUUID()));
+ options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar())) {
@@ -995,7 +991,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
.getStager()
.stageToFile(
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
- String.format(DATAFLOW_GRAPH_FILE_FORMAT, UUID.randomUUID()));
+ DATAFLOW_GRAPH_FILE_NAME);
newJob.getSteps().clear();
newJob.setStepsLocation(stagedGraph.getLocation());
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 0b5e478..7213993 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,7 +34,6 @@ import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -52,7 +51,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.MoreFutures;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
@@ -360,21 +359,15 @@ public class PackageUtil implements Closeable {
@AutoValue
public abstract static class StagedFile {
- public static PackageUtil.StagedFile of(
- String source, String sha256, @Nullable String destination) {
+ public static PackageUtil.StagedFile of(String source, String sha256, String destination) {
return new AutoValue_PackageUtil_StagedFile(source, sha256, destination);
}
- public static PackageUtil.StagedFile of(String source, String sha256) {
- return new AutoValue_PackageUtil_StagedFile(source, sha256, null);
- }
-
/** The file to stage. */
public abstract String getSource();
/** The SHA-256 hash of the source file. */
public abstract String getSha256();
/** Staged target for this file. */
- @Nullable
public abstract String getDestination();
}
@@ -405,25 +398,22 @@ public class PackageUtil implements Closeable {
}
checkState(!file.isDirectory(), "Source file must not be a directory.");
DataflowPackage destination = new DataflowPackage();
- String target = dest == null ? Environments.createStagingFileName(file) : dest;
String resourcePath =
FileSystems.matchNewResource(stagingPath, true)
- .resolve(target, StandardResolveOptions.RESOLVE_FILE)
+ .resolve(dest, StandardResolveOptions.RESOLVE_FILE)
.toString();
destination.setLocation(resourcePath);
- destination.setName(target);
+ destination.setName(dest);
return new AutoValue_PackageUtil_PackageAttributes(
file, null, destination, file.length(), hash);
}
public static PackageAttributes forBytesToStage(
byte[] bytes, String targetName, String stagingPath) {
-
- Hasher hasher = Hashing.sha256().newHasher();
- String hash = hasher.putBytes(bytes).hash().toString();
+ HashCode hashCode = Hashing.sha256().newHasher().putBytes(bytes).hash();
long size = bytes.length;
- String target = targetName == null ? UUID.randomUUID().toString() : targetName;
+ String target = Environments.createStagingFileName(new File(targetName), hashCode);
String resourcePath =
FileSystems.matchNewResource(stagingPath, true)
@@ -433,7 +423,8 @@ public class PackageUtil implements Closeable {
targetPackage.setName(target);
targetPackage.setLocation(resourcePath);
- return new AutoValue_PackageUtil_PackageAttributes(null, bytes, targetPackage, size, hash);
+ return new AutoValue_PackageUtil_PackageAttributes(
+ null, bytes, targetPackage, size, hashCode.toString());
}
public PackageAttributes withPackageName(String overridePackageName) {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
index fb07652..468ec95 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/GCSUploadMain.java
@@ -21,9 +21,11 @@ import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
@@ -39,9 +41,12 @@ public class GCSUploadMain {
.map(
(String source) -> {
try {
+ File file = new File(source);
+ HashCode hashCode = Files.asByteSource(file).hash(Hashing.sha256());
return PackageUtil.StagedFile.of(
source,
- Files.asByteSource(new File(source)).hash(Hashing.sha256()).toString());
+ hashCode.toString(),
+ Environments.createStagingFileName(file, hashCode));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 2847279..535fbce 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -66,6 +66,7 @@ import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.runners.dataflow.util.PackageUtil.StagedFile;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
@@ -85,6 +86,7 @@ import org.apache.beam.sdk.util.ZipFiles;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.LineReader;
@@ -140,12 +142,12 @@ public class PackageUtilTest {
private static PackageAttributes makePackageAttributes(
File file, @Nullable String overridePackageName) throws IOException {
- File sourceFile = file.isDirectory() ? zipDirectory(file) : file;
+ StagedFile stagedFile = makeStagedFile(file.getPath());
PackageAttributes attributes =
PackageUtil.PackageAttributes.forFileToStage(
- sourceFile.getPath(),
- Files.asByteSource(sourceFile).hash(Hashing.sha256()).toString(),
- null,
+ stagedFile.getSource(),
+ stagedFile.getSha256(),
+ stagedFile.getDestination(),
STAGING_PATH);
if (overridePackageName != null) {
attributes = attributes.withPackageName(overridePackageName);
@@ -160,15 +162,17 @@ public class PackageUtilTest {
private static StagedFile makeStagedFile(String source, String destName) throws IOException {
File file = new File(source);
File sourceFile;
- String sha256;
+ HashCode hashCode;
if (file.exists()) {
sourceFile = file.isDirectory() ? zipDirectory(file) : file;
- sha256 = Files.asByteSource(sourceFile).hash(Hashing.sha256()).toString();
+ hashCode = Files.asByteSource(sourceFile).hash(Hashing.sha256());
} else {
sourceFile = file;
- sha256 = "";
+ hashCode = Hashing.sha256().hashBytes(new byte[] {});
}
- return StagedFile.of(sourceFile.getPath(), sha256, destName);
+ String destination =
+ destName == null ? Environments.createStagingFileName(file, hashCode) : destName;
+ return StagedFile.of(sourceFile.getPath(), hashCode.toString(), destination);
}
private static File zipDirectory(File directory) throws IOException {
@@ -218,8 +222,8 @@ public class PackageUtilTest {
makeFileWithContents("folder2/folderA/sameName", "This is a test!");
DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
- assertNotEquals(target1.getName(), target2.getName());
- assertNotEquals(target1.getLocation(), target2.getLocation());
+ assertEquals(target1.getName(), target2.getName());
+ assertEquals(target1.getLocation(), target2.getLocation());
}
@Test