You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/26 15:16:04 UTC
[1/3] beam git commit: Recommit "DataflowRunner: parallelize staging
of files"
Repository: beam
Updated Branches:
refs/heads/master 1c6e66741 -> b4726d088
Recommit "DataflowRunner: parallelize staging of files"
Revert "This closes #1847"
This reverts commit 1c6e667414788fe99f583fac39d458a4984ae162, reversing
changes made to 6413299a20be57de849684479134479fa1acee2d.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23e2b913
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23e2b913
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23e2b913
Branch: refs/heads/master
Commit: 23e2b913946acb2690fbac2d751a5672d80121aa
Parents: 1c6e667
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 21:04:20 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 21:04:27 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 +
.../beam/runners/dataflow/util/GcsStager.java | 18 +-
.../beam/runners/dataflow/util/PackageUtil.java | 349 ++++++++++++-------
.../runners/dataflow/util/PackageUtilTest.java | 42 ++-
.../org/apache/beam/sdk/options/GcsOptions.java | 4 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 12 +
6 files changed, 281 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index eea5502..9858b3d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -203,6 +203,11 @@
</dependency>
<dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-storage</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 6ca4c3f..53822e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,13 +17,19 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.storage.Storage;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
+import org.apache.beam.sdk.util.Transport;
/**
* Utility class for staging files to GCS.
@@ -35,6 +41,7 @@ public class GcsStager implements Stager {
this.options = options;
}
+ @SuppressWarnings("unused") // used via reflection
public static GcsStager fromOptions(PipelineOptions options) {
return new GcsStager(options.as(DataflowPipelineOptions.class));
}
@@ -48,7 +55,16 @@ public class GcsStager implements Stager {
if (windmillBinary != null) {
filesToStage.add("windmill_main=" + windmillBinary);
}
+ int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
+ checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
+ uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
+ Storage.Builder storageBuilder = Transport.newStorageClient(options);
+ GcsUtil util = GcsUtilFactory.create(
+ storageBuilder.build(),
+ storageBuilder.getHttpRequestInitializer(),
+ options.getExecutorService(),
+ uploadSizeBytes);
return PackageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation());
+ options.getFilesToStage(), options.getStagingLocation(), util);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 6d910ba..fa8c94d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -17,53 +17,62 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.fasterxml.jackson.core.Base64Variants;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.collect.Lists;
import com.google.common.hash.Funnels;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Helper routines for packages. */
-public class PackageUtil {
+class PackageUtil {
private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
/**
* A reasonable upper bound on the number of jars required to launch a Dataflow job.
*/
- public static final int SANE_CLASSPATH_SIZE = 1000;
- /**
- * The initial interval to use between package staging attempts.
- */
- private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
- /**
- * The maximum number of retries when staging a file.
- */
- private static final int MAX_RETRIES = 4;
+ private static final int SANE_CLASSPATH_SIZE = 1000;
private static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
+ FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
/**
* Translates exceptions from API calls.
@@ -71,35 +80,18 @@ public class PackageUtil {
private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
/**
- * Creates a DataflowPackage containing information about how a classpath element should be
- * staged, including the staging destination as well as its size and hash.
- *
- * @param classpathElement The local path for the classpath element.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return The package.
- */
- @Deprecated
- public static DataflowPackage createPackage(File classpathElement,
- String stagingPath, String overridePackageName) {
- return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
- .getDataflowPackage();
- }
-
- /**
* Compute and cache the attributes of a classpath element that we will need to stage it.
*
- * @param classpathElement the file or directory to be staged.
+ * @param source the file or directory to be staged.
* @param stagingPath The base location for staged classpath elements.
* @param overridePackageName If non-null, use the given value as the package name
* instead of generating one automatically.
* @return a {@link PackageAttributes} that containing metadata about the object to be staged.
*/
- static PackageAttributes createPackageAttributes(File classpathElement,
- String stagingPath, String overridePackageName) {
+ static PackageAttributes createPackageAttributes(File source,
+ String stagingPath, @Nullable String overridePackageName) {
try {
- boolean directory = classpathElement.isDirectory();
+ boolean directory = source.isDirectory();
// Compute size and hash in one pass over file or directory.
Hasher hasher = Hashing.md5().newHasher();
@@ -108,142 +100,232 @@ public class PackageUtil {
if (!directory) {
// Files are staged as-is.
- Files.asByteSource(classpathElement).copyTo(countingOutputStream);
+ Files.asByteSource(source).copyTo(countingOutputStream);
} else {
// Directories are recursively zipped.
- ZipFiles.zipDirectory(classpathElement, countingOutputStream);
+ ZipFiles.zipDirectory(source, countingOutputStream);
}
long size = countingOutputStream.getCount();
String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
// Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(classpathElement, hash);
+ String uniqueName = getUniqueContentName(source, hash);
String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
DataflowPackage target = new DataflowPackage();
target.setName(overridePackageName != null ? overridePackageName : uniqueName);
target.setLocation(resourcePath);
- return new PackageAttributes(size, hash, directory, target);
+ return new PackageAttributes(size, hash, directory, target, source.getPath());
} catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + classpathElement, e);
+ throw new RuntimeException("Package setup failure for " + source, e);
}
}
- /**
- * Transfers the classpath elements to the staging location.
- *
- * @param classpathElements The elements to stage.
- * @param stagingPath The base location to stage the elements to.
- * @return A list of cloud workflow packages, each representing a classpath element.
- */
- public static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath) {
- return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
- }
-
- // Visible for testing.
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath,
- Sleeper retrySleeper) {
- LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
- + "prepare for execution.", classpathElements.size());
-
- if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
- LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
- + "copies to all workers. Having this many entries on your classpath may be indicative "
- + "of an issue in your pipeline. You may want to consider trimming the classpath to "
- + "necessary dependencies only, using --filesToStage pipeline option to override "
- + "what files are being staged, or bundling several dependencies into one.",
- classpathElements.size());
- }
-
- ArrayList<DataflowPackage> packages = new ArrayList<>();
+ /** Utility comparator used in uploading packages efficiently. */
+ private static class PackageUploadOrder implements Comparator<PackageAttributes> {
+ @Override
+ public int compare(PackageAttributes o1, PackageAttributes o2) {
+ // Smaller size compares high so that bigger packages are uploaded first.
+ long sizeDiff = o2.getSize() - o1.getSize();
+ if (sizeDiff != 0) {
+ // returns sign of long
+ return Long.signum(sizeDiff);
+ }
- if (stagingPath == null) {
- throw new IllegalArgumentException(
- "Can't stage classpath elements on because no staging location has been provided");
+ // Otherwise, choose arbitrarily based on hash.
+ return o1.getHash().compareTo(o2.getHash());
}
+ }
- int numUploaded = 0;
- int numCached = 0;
+ /**
+ * Utility function that computes sizes and hashes of packages so that we can validate whether
+ * they have already been correctly staged.
+ */
+ private static List<PackageAttributes> computePackageAttributes(
+ Collection<String> classpathElements, final String stagingPath,
+ ListeningExecutorService executorService) {
+ List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
for (String classpathElement : classpathElements) {
- String packageName = null;
+ @Nullable String userPackageName = null;
if (classpathElement.contains("=")) {
String[] components = classpathElement.split("=", 2);
- packageName = components[0];
+ userPackageName = components[0];
classpathElement = components[1];
}
+ @Nullable final String packageName = userPackageName;
- File file = new File(classpathElement);
+ final File file = new File(classpathElement);
if (!file.exists()) {
LOG.warn("Skipping non-existent classpath element {} that was specified.",
classpathElement);
continue;
}
- PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
+ ListenableFuture<PackageAttributes> future =
+ executorService.submit(new Callable<PackageAttributes>() {
+ @Override
+ public PackageAttributes call() throws Exception {
+ return createPackageAttributes(file, stagingPath, packageName);
+ }
+ });
+ futures.add(future);
+ }
+
+ try {
+ return Futures.allAsList(futures).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while staging packages", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Error while staging packages", e.getCause());
+ }
+ }
+
+ private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
+ throws IOException {
+ IOChannelFactory factory = IOChannelUtils.getFactory(target);
+ if (factory instanceof GcsIOChannelFactory) {
+ return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
+ } else {
+ return factory.create(target, MimeTypes.BINARY);
+ }
+ }
- DataflowPackage workflowPackage = attributes.getDataflowPackage();
- packages.add(workflowPackage);
- String target = workflowPackage.getLocation();
+ /**
+ * Utility to verify whether a package has already been staged and, if not, copy it to the
+ * staging location.
+ */
+ private static void stageOnePackage(
+ PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
+ Sleeper retrySleeper, GcsUtil gcsUtil) {
+ String source = attributes.getSourcePath();
+ String target = attributes.getDataflowPackage().getLocation();
- // TODO: Should we attempt to detect the Mime type rather than
- // always using MimeTypes.BINARY?
+ // TODO: Should we attempt to detect the Mime type rather than
+ // always using MimeTypes.BINARY?
+ try {
try {
- try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
- if (remoteLength == attributes.getSize()) {
- LOG.debug("Skipping classpath element already staged: {} at {}",
- classpathElement, target);
- numCached++;
- continue;
- }
- } catch (FileNotFoundException expected) {
- // If the file doesn't exist, it means we need to upload it.
+ long remoteLength = IOChannelUtils.getSizeBytes(target);
+ if (remoteLength == attributes.getSize()) {
+ LOG.debug("Skipping classpath element already staged: {} at {}",
+ attributes.getSourcePath(), target);
+ numCached.incrementAndGet();
+ return;
}
+ } catch (FileNotFoundException expected) {
+ // If the file doesn't exist, it means we need to upload it.
+ }
- // Upload file, retrying on failure.
- BackOff backoff = BACKOFF_FACTORY.backoff();
- while (true) {
- try {
- LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
- try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
- copyContent(classpathElement, writer);
- }
- numUploaded++;
- break;
- } catch (IOException e) {
- if (ERROR_EXTRACTOR.accessDenied(e)) {
- String errorMessage = String.format(
- "Uploaded failed due to permissions error, will NOT retry staging "
- + "of classpath %s. Please verify credentials are valid and that you have "
- + "write access to %s. Stale credentials can be resolved by executing "
- + "'gcloud auth login'.", classpathElement, target);
- LOG.error(errorMessage);
- throw new IOException(errorMessage, e);
- }
- long sleep = backoff.nextBackOffMillis();
- if (sleep == BackOff.STOP) {
- // Rethrow last error, to be included as a cause in the catch below.
- LOG.error("Upload failed, will NOT retry staging of classpath: {}",
- classpathElement, e);
- throw e;
- } else {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- classpathElement, e);
- retrySleeper.sleep(sleep);
- }
+ // Upload file, retrying on failure.
+ BackOff backoff = BACKOFF_FACTORY.backoff();
+ while (true) {
+ try {
+ LOG.debug("Uploading classpath element {} to {}", source, target);
+ try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
+ copyContent(source, writer);
+ }
+ numUploaded.incrementAndGet();
+ break;
+ } catch (IOException e) {
+ if (ERROR_EXTRACTOR.accessDenied(e)) {
+ String errorMessage = String.format(
+ "Uploaded failed due to permissions error, will NOT retry staging "
+ + "of classpath %s. Please verify credentials are valid and that you have "
+ + "write access to %s. Stale credentials can be resolved by executing "
+ + "'gcloud auth application-default login'.", source, target);
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage, e);
+ }
+ long sleep = backoff.nextBackOffMillis();
+ if (sleep == BackOff.STOP) {
+ // Rethrow last error, to be included as a cause in the catch below.
+ LOG.error("Upload failed, will NOT retry staging of classpath: {}",
+ source, e);
+ throw e;
+ } else {
+ LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+ source, e);
+ retrySleeper.sleep(sleep);
}
}
- } catch (Exception e) {
- throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
}
+ } catch (Exception e) {
+ throw new RuntimeException("Could not stage classpath element: " + source, e);
}
+ }
- LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
- + "{} files cached",
- numUploaded, numCached);
+ /**
+ * Transfers the classpath elements to the staging location.
+ *
+ * @param classpathElements The elements to stage.
+ * @param stagingPath The base location to stage the elements to.
+ * @return A list of cloud workflow packages, each representing a classpath element.
+ */
+ static List<DataflowPackage> stageClasspathElements(
+ Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) {
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
+ try {
+ return stageClasspathElements(
+ classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ // Visible for testing.
+ static List<DataflowPackage> stageClasspathElements(
+ Collection<String> classpathElements, final String stagingPath,
+ final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) {
+ LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+ + "prepare for execution.", classpathElements.size());
+
+ if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
+ LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
+ + "copies to all workers. Having this many entries on your classpath may be indicative "
+ + "of an issue in your pipeline. You may want to consider trimming the classpath to "
+ + "necessary dependencies only, using --filesToStage pipeline option to override "
+ + "what files are being staged, or bundling several dependencies into one.",
+ classpathElements.size());
+ }
+
+ checkArgument(
+ stagingPath != null,
+ "Can't stage classpath elements because no staging location has been provided");
+
+ // Inline a copy here because the inner code returns an immutable list and we want to mutate it.
+ List<PackageAttributes> packageAttributes =
+ new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
+ // Order package attributes in descending size order so that we upload the largest files first.
+ Collections.sort(packageAttributes, new PackageUploadOrder());
+
+ List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
+ final AtomicInteger numUploaded = new AtomicInteger(0);
+ final AtomicInteger numCached = new AtomicInteger(0);
+
+ List<ListenableFuture<?>> futures = new LinkedList<>();
+ for (final PackageAttributes attributes : packageAttributes) {
+ packages.add(attributes.getDataflowPackage());
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil);
+ }
+ }));
+ }
+ try {
+ Futures.allAsList(futures).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while staging packages", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Error while staging packages", e.getCause());
+ }
+
+ LOG.info(
+ "Staging files complete: {} files cached, {} files newly uploaded",
+ numUploaded.get(), numCached.get());
return packages;
}
@@ -293,13 +375,15 @@ public class PackageUtil {
private final boolean directory;
private final long size;
private final String hash;
+ private final String sourcePath;
private DataflowPackage dataflowPackage;
public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage) {
+ DataflowPackage dataflowPackage, String sourcePath) {
this.size = size;
this.hash = Objects.requireNonNull(hash, "hash");
this.directory = directory;
+ this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
}
@@ -330,5 +414,12 @@ public class PackageUtil {
public String getHash() {
return hash;
}
+
+ /**
+ * @return the file to be uploaded
+ */
+ public String getSourcePath() {
+ return sourcePath;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 05a87dd..3828415 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.dataflow.util;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -53,6 +53,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -235,7 +236,7 @@ public class PackageUtilTest {
classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
}
- PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+ PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil);
logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
}
@@ -250,7 +251,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -277,7 +278,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -304,7 +305,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -327,7 +328,8 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper);
+ STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+ mockGcsUtil);
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
@@ -348,16 +350,20 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper);
+ STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+ mockGcsUtil);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
- assertTrue("Expected IOException containing detailed message.",
- e.getCause() instanceof IOException);
- assertThat(e.getCause().getMessage(),
+ assertThat("Expected RuntimeException wrapping IOException.",
+ e.getCause(), instanceOf(RuntimeException.class));
+ assertThat("Expected IOException containing detailed message.",
+ e.getCause().getCause(), instanceOf(IOException.class));
+ assertThat(e.getCause().getCause().getMessage(),
Matchers.allOf(
Matchers.containsString("Uploaded failed due to permissions error"),
Matchers.containsString(
- "Stale credentials can be resolved by executing 'gcloud auth login'")));
+ "Stale credentials can be resolved by executing 'gcloud auth application-default "
+ + "login'")));
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -377,9 +383,8 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH,
- fastNanoClockAndSleeper);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
+ MoreExecutors.newDirectExecutorService(), mockGcsUtil);
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -393,7 +398,7 @@ public class PackageUtilTest {
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verifyNoMoreInteractions(mockGcsUtil);
@@ -411,7 +416,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -429,7 +434,8 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+ ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
+ mockGcsUtil);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -446,7 +452,7 @@ public class PackageUtilTest {
String nonExistentFile =
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
- ImmutableList.of(nonExistentFile), STAGING_PATH));
+ ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index 0553efc..72e106d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.util.AppEngineEnvironment;
import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
@@ -81,8 +82,9 @@ public interface GcsOptions extends
+ "information on the restrictions and performance implications of this value.\n\n"
+ "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+ @Nullable
Integer getGcsUploadBufferSizeBytes();
- void setGcsUploadBufferSizeBytes(Integer bytes);
+ void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
/**
* The class of the validator that should be created and used to validate paths.
http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index a10ea28..5e83584 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -101,6 +101,18 @@ public class GcsUtil {
gcsOptions.getExecutorService(),
gcsOptions.getGcsUploadBufferSizeBytes());
}
+
+ /**
+ * Returns an instance of {@link GcsUtil} based on the given parameters.
+ */
+ public static GcsUtil create(
+ Storage storageClient,
+ HttpRequestInitializer httpRequestInitializer,
+ ExecutorService executorService,
+ @Nullable Integer uploadBufferSizeBytes) {
+ return new GcsUtil(
+ storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
[2/3] beam git commit: PackageUtil: preserve classpath ordering when
uploading
Posted by dh...@apache.org.
PackageUtil: preserve classpath ordering when uploading
Also add a test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b91c84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b91c84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b91c84
Branch: refs/heads/master
Commit: b0b91c842e09aa7fdb5c1dc216574daa43b437ea
Parents: 23e2b91
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 22:15:59 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 22:16:22 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/util/PackageUtil.java | 11 +++++---
.../runners/dataflow/util/PackageUtilTest.java | 27 ++++++++++++++++++++
2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index fa8c94d..685d48c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -297,16 +297,21 @@ class PackageUtil {
// Inline a copy here because the inner code returns an immutable list and we want to mutate it.
List<PackageAttributes> packageAttributes =
new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
- // Order package attributes in descending size order so that we upload the largest files first.
- Collections.sort(packageAttributes, new PackageUploadOrder());
+ // Compute the returned list of DataflowPackage objects here so that they are returned in the
+ // same order as on the classpath.
List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
+ for (final PackageAttributes attributes : packageAttributes) {
+ packages.add(attributes.getDataflowPackage());
+ }
+
+ // Order package attributes in descending size order so that we upload the largest files first.
+ Collections.sort(packageAttributes, new PackageUploadOrder());
final AtomicInteger numUploaded = new AtomicInteger(0);
final AtomicInteger numCached = new AtomicInteger(0);
List<ListenableFuture<?>> futures = new LinkedList<>();
for (final PackageAttributes attributes : packageAttributes) {
- packages.add(attributes.getDataflowPackage());
futures.add(executorService.submit(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 3828415..800c5a9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.util;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -59,6 +60,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,6 +88,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for PackageUtil. */
@RunWith(JUnit4.class)
@@ -265,6 +269,29 @@ public class PackageUtilTest {
}
@Test
+ public void testStagingPreservesClasspath() throws Exception {
+ File smallFile = makeFileWithContents("small.txt", "small");
+ File largeFile = makeFileWithContents("large.txt", "large contents");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenAnswer(new Answer<SinkChannel>() {
+ @Override
+ public SinkChannel answer(InvocationOnMock invocation) throws Throwable {
+ return Pipe.open().sink();
+ }
+ });
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
+ STAGING_PATH, mockGcsUtil);
+ // Verify that the packages are returned small, then large, matching input order even though
+ // the large file would be uploaded first.
+ assertThat(targets.get(0).getName(), startsWith("small"));
+ assertThat(targets.get(1).getName(), startsWith("large"));
+ }
+
+ @Test
public void testPackageUploadWithDirectorySucceeds() throws Exception {
Pipe pipe = Pipe.open();
File tmpDirectory = tmpFolder.newFolder("folder");
[3/3] beam git commit: This closes #1849
Posted by dh...@apache.org.
This closes #1849
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b4726d08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b4726d08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b4726d08
Branch: refs/heads/master
Commit: b4726d088faa2ea74ba3a7e29a7559f737ccf4f2
Parents: 1c6e667 b0b91c8
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jan 26 07:15:54 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jan 26 07:15:54 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 +
.../beam/runners/dataflow/util/GcsStager.java | 18 +-
.../beam/runners/dataflow/util/PackageUtil.java | 352 ++++++++++++-------
.../runners/dataflow/util/PackageUtilTest.java | 69 +++-
.../org/apache/beam/sdk/options/GcsOptions.java | 4 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 12 +
6 files changed, 312 insertions(+), 148 deletions(-)
----------------------------------------------------------------------