You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/26 15:16:05 UTC

[2/3] beam git commit: PackageUtil: preserve classpath ordering when uploading

PackageUtil: preserve classpath ordering when uploading

Also add a test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b91c84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b91c84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b91c84

Branch: refs/heads/master
Commit: b0b91c842e09aa7fdb5c1dc216574daa43b437ea
Parents: 23e2b91
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 22:15:59 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 22:16:22 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/PackageUtil.java | 11 +++++---
 .../runners/dataflow/util/PackageUtilTest.java  | 27 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index fa8c94d..685d48c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -297,16 +297,21 @@ class PackageUtil {
     // Inline a copy here because the inner code returns an immutable list and we want to mutate it.
     List<PackageAttributes> packageAttributes =
         new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
-    // Order package attributes in descending size order so that we upload the largest files first.
-    Collections.sort(packageAttributes, new PackageUploadOrder());
 
+    // Compute the returned list of DataflowPackage objects here so that they are returned in the
+    // same order as on the classpath.
     List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
+    for (final PackageAttributes attributes : packageAttributes) {
+      packages.add(attributes.getDataflowPackage());
+    }
+
+    // Order package attributes in descending size order so that we upload the largest files first.
+    Collections.sort(packageAttributes, new PackageUploadOrder());
     final AtomicInteger numUploaded = new AtomicInteger(0);
     final AtomicInteger numCached = new AtomicInteger(0);
 
     List<ListenableFuture<?>> futures = new LinkedList<>();
     for (final PackageAttributes attributes : packageAttributes) {
-      packages.add(attributes.getDataflowPackage());
       futures.add(executorService.submit(new Runnable() {
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 3828415..800c5a9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.util;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -59,6 +60,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -86,6 +88,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /** Tests for PackageUtil. */
 @RunWith(JUnit4.class)
@@ -265,6 +269,29 @@ public class PackageUtilTest {
   }
 
   @Test
+  public void testStagingPreservesClasspath() throws Exception {
+    File smallFile = makeFileWithContents("small.txt", "small");
+    File largeFile = makeFileWithContents("large.txt", "large contents");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenAnswer(new Answer<SinkChannel>() {
+          @Override
+          public SinkChannel answer(InvocationOnMock invocation) throws Throwable {
+            return Pipe.open().sink();
+          }
+        });
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
+        STAGING_PATH, mockGcsUtil);
+    // Verify that the packages are returned small, then large, matching input order even though
+    // the large file would be uploaded first.
+    assertThat(targets.get(0).getName(), startsWith("small"));
+    assertThat(targets.get(1).getName(), startsWith("large"));
+  }
+
+  @Test
   public void testPackageUploadWithDirectorySucceeds() throws Exception {
     Pipe pipe = Pipe.open();
     File tmpDirectory = tmpFolder.newFolder("folder");