You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/10 02:34:10 UTC
[beam] branch master updated: Deduplicate uploads by destinations
before uploading
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ddb851e Deduplicate uploads by destinations before uploading
new 8fc613a Merge pull request #12144 from [BEAM-10395] Deduplicate uploads by destinations before uploading
ddb851e is described below
commit ddb851e893fdfe467379691cfc5d9fe25f908596
Author: Steve Niemitz <st...@gmail.com>
AuthorDate: Tue Jun 30 21:28:33 2020 -0400
Deduplicate uploads by destinations before uploading
---
.../apache/beam/runners/dataflow/util/PackageUtil.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 036d2db..d28ad78 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,6 +34,8 @@ import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -51,6 +53,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteSource;
@@ -296,6 +299,7 @@ public class PackageUtil implements Closeable {
final AtomicInteger numUploaded = new AtomicInteger(0);
final AtomicInteger numCached = new AtomicInteger(0);
List<CompletionStage<DataflowPackage>> destinationPackages = new ArrayList<>();
+ final Set<String> distinctDestinations = Sets.newConcurrentHashSet();
for (StagedFile classpathElement : classpathElements) {
String dest = classpathElement.getDestination();
@@ -311,8 +315,17 @@ public class PackageUtil implements Closeable {
CompletionStage<StagingResult> stagingResult =
computePackageAttributes(source, hash, dest, stagingPath)
.thenComposeAsync(
- packageAttributes ->
- stagePackage(packageAttributes, retrySleeper, createOptions));
+ packageAttributes -> {
+ String destLocation = packageAttributes.getDestination().getLocation();
+ if (distinctDestinations.add(destLocation)) {
+ return stagePackage(packageAttributes, retrySleeper, createOptions);
+ } else {
+ LOG.debug("Upload of {} skipped because it was already queued", destLocation);
+
+ return CompletableFuture.completedFuture(
+ StagingResult.cached(packageAttributes));
+ }
+ });
CompletionStage<DataflowPackage> stagedPackage =
stagingResult.thenApply(