You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/10 02:34:10 UTC

[beam] branch master updated: Deduplicate uploads by destinations before uploading

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb851e  Deduplicate uploads by destinations before uploading
     new 8fc613a  Merge pull request #12144 from [BEAM-10395] Deduplicate uploads by destinations before uploading
ddb851e is described below

commit ddb851e893fdfe467379691cfc5d9fe25f908596
Author: Steve Niemitz <st...@gmail.com>
AuthorDate: Tue Jun 30 21:28:33 2020 -0400

    Deduplicate uploads by destinations before uploading
---
 .../apache/beam/runners/dataflow/util/PackageUtil.java  | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 036d2db..d28ad78 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,6 +34,8 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +53,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteSource;
@@ -296,6 +299,7 @@ public class PackageUtil implements Closeable {
     final AtomicInteger numUploaded = new AtomicInteger(0);
     final AtomicInteger numCached = new AtomicInteger(0);
     List<CompletionStage<DataflowPackage>> destinationPackages = new ArrayList<>();
+    final Set<String> distinctDestinations = Sets.newConcurrentHashSet();
 
     for (StagedFile classpathElement : classpathElements) {
       String dest = classpathElement.getDestination();
@@ -311,8 +315,17 @@ public class PackageUtil implements Closeable {
       CompletionStage<StagingResult> stagingResult =
           computePackageAttributes(source, hash, dest, stagingPath)
               .thenComposeAsync(
-                  packageAttributes ->
-                      stagePackage(packageAttributes, retrySleeper, createOptions));
+                  packageAttributes -> {
+                    String destLocation = packageAttributes.getDestination().getLocation();
+                    if (distinctDestinations.add(destLocation)) {
+                      return stagePackage(packageAttributes, retrySleeper, createOptions);
+                    } else {
+                      LOG.debug("Upload of {} skipped because it was already queued", destLocation);
+
+                      return CompletableFuture.completedFuture(
+                          StagingResult.cached(packageAttributes));
+                    }
+                  });
 
       CompletionStage<DataflowPackage> stagedPackage =
           stagingResult.thenApply(