You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:16 UTC
[30/50] beam git commit: PackageUtil: preserve classpath ordering
when uploading
PackageUtil: preserve classpath ordering when uploading
Also add a test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b91c84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b91c84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b91c84
Branch: refs/heads/python-sdk
Commit: b0b91c842e09aa7fdb5c1dc216574daa43b437ea
Parents: 23e2b91
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 22:15:59 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 22:16:22 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/util/PackageUtil.java | 11 +++++---
.../runners/dataflow/util/PackageUtilTest.java | 27 ++++++++++++++++++++
2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index fa8c94d..685d48c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -297,16 +297,21 @@ class PackageUtil {
// Inline a copy here because the inner code returns an immutable list and we want to mutate it.
List<PackageAttributes> packageAttributes =
new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
- // Order package attributes in descending size order so that we upload the largest files first.
- Collections.sort(packageAttributes, new PackageUploadOrder());
+ // Compute the returned list of DataflowPackage objects here so that they are returned in the
+ // same order as on the classpath.
List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
+ for (final PackageAttributes attributes : packageAttributes) {
+ packages.add(attributes.getDataflowPackage());
+ }
+
+ // Order package attributes in descending size order so that we upload the largest files first.
+ Collections.sort(packageAttributes, new PackageUploadOrder());
final AtomicInteger numUploaded = new AtomicInteger(0);
final AtomicInteger numCached = new AtomicInteger(0);
List<ListenableFuture<?>> futures = new LinkedList<>();
for (final PackageAttributes attributes : packageAttributes) {
- packages.add(attributes.getDataflowPackage());
futures.add(executorService.submit(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 3828415..800c5a9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.util;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -59,6 +60,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,6 +88,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for PackageUtil. */
@RunWith(JUnit4.class)
@@ -265,6 +269,29 @@ public class PackageUtilTest {
}
@Test
+ public void testStagingPreservesClasspath() throws Exception {
+ File smallFile = makeFileWithContents("small.txt", "small");
+ File largeFile = makeFileWithContents("large.txt", "large contents");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenAnswer(new Answer<SinkChannel>() {
+ @Override
+ public SinkChannel answer(InvocationOnMock invocation) throws Throwable {
+ return Pipe.open().sink();
+ }
+ });
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
+ STAGING_PATH, mockGcsUtil);
+ // Verify that the packages are returned small, then large, matching input order even though
+ // the large file would be uploaded first.
+ assertThat(targets.get(0).getName(), startsWith("small"));
+ assertThat(targets.get(1).getName(), startsWith("large"));
+ }
+
+ @Test
public void testPackageUploadWithDirectorySucceeds() throws Exception {
Pipe pipe = Pipe.open();
File tmpDirectory = tmpFolder.newFolder("folder");