You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/26 01:47:16 UTC
[1/2] beam git commit: Revert "This closes #1184"
Repository: beam
Updated Branches:
refs/heads/master 6413299a2 -> 1c6e66741
Revert "This closes #1184"
This reverts commit c525783704e0cc47845df8cdec1715e1f1c74008, reversing
changes made to 979c9376f820577bad43c18cc1a7ee86fab9d942.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fee029f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fee029f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fee029f7
Branch: refs/heads/master
Commit: fee029f7f9963c9de821ff5792d7f45fabe6cb5d
Parents: 6413299
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 15:54:26 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 15:54:26 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 -
.../beam/runners/dataflow/util/GcsStager.java | 18 +-
.../beam/runners/dataflow/util/PackageUtil.java | 349 +++++++------------
.../runners/dataflow/util/PackageUtilTest.java | 42 +--
.../org/apache/beam/sdk/options/GcsOptions.java | 4 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 12 -
6 files changed, 149 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 9858b3d..eea5502 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -203,11 +203,6 @@
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-storage</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 53822e3..6ca4c3f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,19 +17,13 @@
*/
package org.apache.beam.runners.dataflow.util;
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.storage.Storage;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.Transport;
/**
* Utility class for staging files to GCS.
@@ -41,7 +35,6 @@ public class GcsStager implements Stager {
this.options = options;
}
- @SuppressWarnings("unused") // used via reflection
public static GcsStager fromOptions(PipelineOptions options) {
return new GcsStager(options.as(DataflowPipelineOptions.class));
}
@@ -55,16 +48,7 @@ public class GcsStager implements Stager {
if (windmillBinary != null) {
filesToStage.add("windmill_main=" + windmillBinary);
}
- int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
- checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
- uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
- Storage.Builder storageBuilder = Transport.newStorageClient(options);
- GcsUtil util = GcsUtilFactory.create(
- storageBuilder.build(),
- storageBuilder.getHttpRequestInitializer(),
- options.getExecutorService(),
- uploadSizeBytes);
return PackageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation(), util);
+ options.getFilesToStage(), options.getStagingLocation());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index fa8c94d..6d910ba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -17,62 +17,53 @@
*/
package org.apache.beam.runners.dataflow.util;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.fasterxml.jackson.core.Base64Variants;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.collect.Lists;
import com.google.common.hash.Funnels;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Helper routines for packages. */
-class PackageUtil {
+public class PackageUtil {
private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
/**
* A reasonable upper bound on the number of jars required to launch a Dataflow job.
*/
- private static final int SANE_CLASSPATH_SIZE = 1000;
+ public static final int SANE_CLASSPATH_SIZE = 1000;
+ /**
+ * The initial interval to use between package staging attempts.
+ */
+ private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
+ /**
+ * The maximum number of retries when staging a file.
+ */
+ private static final int MAX_RETRIES = 4;
private static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
/**
* Translates exceptions from API calls.
@@ -80,18 +71,35 @@ class PackageUtil {
private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
/**
+ * Creates a DataflowPackage containing information about how a classpath element should be
+ * staged, including the staging destination as well as its size and hash.
+ *
+ * @param classpathElement The local path for the classpath element.
+ * @param stagingPath The base location for staged classpath elements.
+ * @param overridePackageName If non-null, use the given value as the package name
+ * instead of generating one automatically.
+ * @return The package.
+ */
+ @Deprecated
+ public static DataflowPackage createPackage(File classpathElement,
+ String stagingPath, String overridePackageName) {
+ return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
+ .getDataflowPackage();
+ }
+
+ /**
* Compute and cache the attributes of a classpath element that we will need to stage it.
*
- * @param source the file or directory to be staged.
+ * @param classpathElement the file or directory to be staged.
* @param stagingPath The base location for staged classpath elements.
* @param overridePackageName If non-null, use the given value as the package name
* instead of generating one automatically.
* @return a {@link PackageAttributes} that containing metadata about the object to be staged.
*/
- static PackageAttributes createPackageAttributes(File source,
- String stagingPath, @Nullable String overridePackageName) {
+ static PackageAttributes createPackageAttributes(File classpathElement,
+ String stagingPath, String overridePackageName) {
try {
- boolean directory = source.isDirectory();
+ boolean directory = classpathElement.isDirectory();
// Compute size and hash in one pass over file or directory.
Hasher hasher = Hashing.md5().newHasher();
@@ -100,158 +108,25 @@ class PackageUtil {
if (!directory) {
// Files are staged as-is.
- Files.asByteSource(source).copyTo(countingOutputStream);
+ Files.asByteSource(classpathElement).copyTo(countingOutputStream);
} else {
// Directories are recursively zipped.
- ZipFiles.zipDirectory(source, countingOutputStream);
+ ZipFiles.zipDirectory(classpathElement, countingOutputStream);
}
long size = countingOutputStream.getCount();
String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
// Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(source, hash);
+ String uniqueName = getUniqueContentName(classpathElement, hash);
String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
DataflowPackage target = new DataflowPackage();
target.setName(overridePackageName != null ? overridePackageName : uniqueName);
target.setLocation(resourcePath);
- return new PackageAttributes(size, hash, directory, target, source.getPath());
+ return new PackageAttributes(size, hash, directory, target);
} catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + source, e);
- }
- }
-
- /** Utility comparator used in uploading packages efficiently. */
- private static class PackageUploadOrder implements Comparator<PackageAttributes> {
- @Override
- public int compare(PackageAttributes o1, PackageAttributes o2) {
- // Smaller size compares high so that bigger packages are uploaded first.
- long sizeDiff = o2.getSize() - o1.getSize();
- if (sizeDiff != 0) {
- // returns sign of long
- return Long.signum(sizeDiff);
- }
-
- // Otherwise, choose arbitrarily based on hash.
- return o1.getHash().compareTo(o2.getHash());
- }
- }
-
- /**
- * Utility function that computes sizes and hashes of packages so that we can validate whether
- * they have already been correctly staged.
- */
- private static List<PackageAttributes> computePackageAttributes(
- Collection<String> classpathElements, final String stagingPath,
- ListeningExecutorService executorService) {
- List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
- for (String classpathElement : classpathElements) {
- @Nullable String userPackageName = null;
- if (classpathElement.contains("=")) {
- String[] components = classpathElement.split("=", 2);
- userPackageName = components[0];
- classpathElement = components[1];
- }
- @Nullable final String packageName = userPackageName;
-
- final File file = new File(classpathElement);
- if (!file.exists()) {
- LOG.warn("Skipping non-existent classpath element {} that was specified.",
- classpathElement);
- continue;
- }
-
- ListenableFuture<PackageAttributes> future =
- executorService.submit(new Callable<PackageAttributes>() {
- @Override
- public PackageAttributes call() throws Exception {
- return createPackageAttributes(file, stagingPath, packageName);
- }
- });
- futures.add(future);
- }
-
- try {
- return Futures.allAsList(futures).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while staging packages", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Error while staging packages", e.getCause());
- }
- }
-
- private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
- throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(target);
- if (factory instanceof GcsIOChannelFactory) {
- return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
- } else {
- return factory.create(target, MimeTypes.BINARY);
- }
- }
-
- /**
- * Utility to verify whether a package has already been staged and, if not, copy it to the
- * staging location.
- */
- private static void stageOnePackage(
- PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
- Sleeper retrySleeper, GcsUtil gcsUtil) {
- String source = attributes.getSourcePath();
- String target = attributes.getDataflowPackage().getLocation();
-
- // TODO: Should we attempt to detect the Mime type rather than
- // always using MimeTypes.BINARY?
- try {
- try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
- if (remoteLength == attributes.getSize()) {
- LOG.debug("Skipping classpath element already staged: {} at {}",
- attributes.getSourcePath(), target);
- numCached.incrementAndGet();
- return;
- }
- } catch (FileNotFoundException expected) {
- // If the file doesn't exist, it means we need to upload it.
- }
-
- // Upload file, retrying on failure.
- BackOff backoff = BACKOFF_FACTORY.backoff();
- while (true) {
- try {
- LOG.debug("Uploading classpath element {} to {}", source, target);
- try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
- copyContent(source, writer);
- }
- numUploaded.incrementAndGet();
- break;
- } catch (IOException e) {
- if (ERROR_EXTRACTOR.accessDenied(e)) {
- String errorMessage = String.format(
- "Uploaded failed due to permissions error, will NOT retry staging "
- + "of classpath %s. Please verify credentials are valid and that you have "
- + "write access to %s. Stale credentials can be resolved by executing "
- + "'gcloud auth application-default login'.", source, target);
- LOG.error(errorMessage);
- throw new IOException(errorMessage, e);
- }
- long sleep = backoff.nextBackOffMillis();
- if (sleep == BackOff.STOP) {
- // Rethrow last error, to be included as a cause in the catch below.
- LOG.error("Upload failed, will NOT retry staging of classpath: {}",
- source, e);
- throw e;
- } else {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- source, e);
- retrySleeper.sleep(sleep);
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not stage classpath element: " + source, e);
+ throw new RuntimeException("Package setup failure for " + classpathElement, e);
}
}
@@ -262,70 +137,113 @@ class PackageUtil {
* @param stagingPath The base location to stage the elements to.
* @return A list of cloud workflow packages, each representing a classpath element.
*/
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) {
- ListeningExecutorService executorService =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
- try {
- return stageClasspathElements(
- classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil);
- } finally {
- executorService.shutdown();
- }
+ public static List<DataflowPackage> stageClasspathElements(
+ Collection<String> classpathElements, String stagingPath) {
+ return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
}
// Visible for testing.
static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, final String stagingPath,
- final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) {
+ Collection<String> classpathElements, String stagingPath,
+ Sleeper retrySleeper) {
LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+ "prepare for execution.", classpathElements.size());
if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
- + "copies to all workers. Having this many entries on your classpath may be indicative "
- + "of an issue in your pipeline. You may want to consider trimming the classpath to "
- + "necessary dependencies only, using --filesToStage pipeline option to override "
- + "what files are being staged, or bundling several dependencies into one.",
+ + "copies to all workers. Having this many entries on your classpath may be indicative "
+ + "of an issue in your pipeline. You may want to consider trimming the classpath to "
+ + "necessary dependencies only, using --filesToStage pipeline option to override "
+ + "what files are being staged, or bundling several dependencies into one.",
classpathElements.size());
}
- checkArgument(
- stagingPath != null,
- "Can't stage classpath elements because no staging location has been provided");
+ ArrayList<DataflowPackage> packages = new ArrayList<>();
- // Inline a copy here because the inner code returns an immutable list and we want to mutate it.
- List<PackageAttributes> packageAttributes =
- new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
- // Order package attributes in descending size order so that we upload the largest files first.
- Collections.sort(packageAttributes, new PackageUploadOrder());
+ if (stagingPath == null) {
+ throw new IllegalArgumentException(
+ "Can't stage classpath elements on because no staging location has been provided");
+ }
- List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
- final AtomicInteger numUploaded = new AtomicInteger(0);
- final AtomicInteger numCached = new AtomicInteger(0);
+ int numUploaded = 0;
+ int numCached = 0;
+ for (String classpathElement : classpathElements) {
+ String packageName = null;
+ if (classpathElement.contains("=")) {
+ String[] components = classpathElement.split("=", 2);
+ packageName = components[0];
+ classpathElement = components[1];
+ }
- List<ListenableFuture<?>> futures = new LinkedList<>();
- for (final PackageAttributes attributes : packageAttributes) {
- packages.add(attributes.getDataflowPackage());
- futures.add(executorService.submit(new Runnable() {
- @Override
- public void run() {
- stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil);
+ File file = new File(classpathElement);
+ if (!file.exists()) {
+ LOG.warn("Skipping non-existent classpath element {} that was specified.",
+ classpathElement);
+ continue;
+ }
+
+ PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
+
+ DataflowPackage workflowPackage = attributes.getDataflowPackage();
+ packages.add(workflowPackage);
+ String target = workflowPackage.getLocation();
+
+ // TODO: Should we attempt to detect the Mime type rather than
+ // always using MimeTypes.BINARY?
+ try {
+ try {
+ long remoteLength = IOChannelUtils.getSizeBytes(target);
+ if (remoteLength == attributes.getSize()) {
+ LOG.debug("Skipping classpath element already staged: {} at {}",
+ classpathElement, target);
+ numCached++;
+ continue;
+ }
+ } catch (FileNotFoundException expected) {
+ // If the file doesn't exist, it means we need to upload it.
}
- }));
- }
- try {
- Futures.allAsList(futures).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while staging packages", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Error while staging packages", e.getCause());
+
+ // Upload file, retrying on failure.
+ BackOff backoff = BACKOFF_FACTORY.backoff();
+ while (true) {
+ try {
+ LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
+ try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
+ copyContent(classpathElement, writer);
+ }
+ numUploaded++;
+ break;
+ } catch (IOException e) {
+ if (ERROR_EXTRACTOR.accessDenied(e)) {
+ String errorMessage = String.format(
+ "Uploaded failed due to permissions error, will NOT retry staging "
+ + "of classpath %s. Please verify credentials are valid and that you have "
+ + "write access to %s. Stale credentials can be resolved by executing "
+ + "'gcloud auth login'.", classpathElement, target);
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage, e);
+ }
+ long sleep = backoff.nextBackOffMillis();
+ if (sleep == BackOff.STOP) {
+ // Rethrow last error, to be included as a cause in the catch below.
+ LOG.error("Upload failed, will NOT retry staging of classpath: {}",
+ classpathElement, e);
+ throw e;
+ } else {
+ LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+ classpathElement, e);
+ retrySleeper.sleep(sleep);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
+ }
}
- LOG.info(
- "Staging files complete: {} files cached, {} files newly uploaded",
- numUploaded.get(), numCached.get());
+ LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
+ + "{} files cached",
+ numUploaded, numCached);
return packages;
}
@@ -375,15 +293,13 @@ class PackageUtil {
private final boolean directory;
private final long size;
private final String hash;
- private final String sourcePath;
private DataflowPackage dataflowPackage;
public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage, String sourcePath) {
+ DataflowPackage dataflowPackage) {
this.size = size;
this.hash = Objects.requireNonNull(hash, "hash");
this.directory = directory;
- this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
}
@@ -414,12 +330,5 @@ class PackageUtil {
public String getHash() {
return hash;
}
-
- /**
- * @return the file to be uploaded
- */
- public String getSourcePath() {
- return sourcePath;
- }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 3828415..05a87dd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.dataflow.util;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -53,7 +53,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.io.LineReader;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -236,7 +235,7 @@ public class PackageUtilTest {
classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
}
- PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil);
+ PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
}
@@ -251,7 +250,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -278,7 +277,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -305,7 +304,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -328,8 +327,7 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
- mockGcsUtil);
+ STAGING_PATH, fastNanoClockAndSleeper);
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
@@ -350,20 +348,16 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
- mockGcsUtil);
+ STAGING_PATH, fastNanoClockAndSleeper);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
- assertThat("Expected RuntimeException wrapping IOException.",
- e.getCause(), instanceOf(RuntimeException.class));
- assertThat("Expected IOException containing detailed message.",
- e.getCause().getCause(), instanceOf(IOException.class));
- assertThat(e.getCause().getCause().getMessage(),
+ assertTrue("Expected IOException containing detailed message.",
+ e.getCause() instanceof IOException);
+ assertThat(e.getCause().getMessage(),
Matchers.allOf(
Matchers.containsString("Uploaded failed due to permissions error"),
Matchers.containsString(
- "Stale credentials can be resolved by executing 'gcloud auth application-default "
- + "login'")));
+ "Stale credentials can be resolved by executing 'gcloud auth login'")));
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -383,8 +377,9 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
- MoreExecutors.newDirectExecutorService(), mockGcsUtil);
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH,
+ fastNanoClockAndSleeper);
} finally {
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -398,7 +393,7 @@ public class PackageUtilTest {
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verifyNoMoreInteractions(mockGcsUtil);
@@ -416,7 +411,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -434,8 +429,7 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
- mockGcsUtil);
+ ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -452,7 +446,7 @@ public class PackageUtilTest {
String nonExistentFile =
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
- ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
+ ImmutableList.of(nonExistentFile), STAGING_PATH));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index 72e106d..0553efc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.util.AppEngineEnvironment;
import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
@@ -82,9 +81,8 @@ public interface GcsOptions extends
+ "information on the restrictions and performance implications of this value.\n\n"
+ "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
- @Nullable
Integer getGcsUploadBufferSizeBytes();
- void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+ void setGcsUploadBufferSizeBytes(Integer bytes);
/**
* The class of the validator that should be created and used to validate paths.
http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 5e83584..a10ea28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -101,18 +101,6 @@ public class GcsUtil {
gcsOptions.getExecutorService(),
gcsOptions.getGcsUploadBufferSizeBytes());
}
-
- /**
- * Returns an instance of {@link GcsUtil} based on the given parameters.
- */
- public static GcsUtil create(
- Storage storageClient,
- HttpRequestInitializer httpRequestInitializer,
- ExecutorService executorService,
- @Nullable Integer uploadBufferSizeBytes) {
- return new GcsUtil(
- storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
- }
}
private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
[2/2] beam git commit: This closes #1847
Posted by dh...@apache.org.
This closes #1847
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c6e6674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c6e6674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c6e6674
Branch: refs/heads/master
Commit: 1c6e667414788fe99f583fac39d458a4984ae162
Parents: 6413299 fee029f
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 17:47:08 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 17:47:08 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 -
.../beam/runners/dataflow/util/GcsStager.java | 18 +-
.../beam/runners/dataflow/util/PackageUtil.java | 349 +++++++------------
.../runners/dataflow/util/PackageUtilTest.java | 42 +--
.../org/apache/beam/sdk/options/GcsOptions.java | 4 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 12 -
6 files changed, 149 insertions(+), 281 deletions(-)
----------------------------------------------------------------------