You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:13 UTC

[27/50] beam git commit: Revert "This closes #1184"

Revert "This closes #1184"

This reverts commit c525783704e0cc47845df8cdec1715e1f1c74008, reversing
changes made to 979c9376f820577bad43c18cc1a7ee86fab9d942.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fee029f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fee029f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fee029f7

Branch: refs/heads/python-sdk
Commit: fee029f7f9963c9de821ff5792d7f45fabe6cb5d
Parents: 6413299
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 25 15:54:26 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 25 15:54:26 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   5 -
 .../beam/runners/dataflow/util/GcsStager.java   |  18 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 349 +++++++------------
 .../runners/dataflow/util/PackageUtilTest.java  |  42 +--
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 -
 6 files changed, 149 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 9858b3d..eea5502 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -203,11 +203,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-storage</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.auth</groupId>
       <artifactId>google-auth-library-credentials</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 53822e3..6ca4c3f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,19 +17,13 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.storage.Storage;
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.Transport;
 
 /**
  * Utility class for staging files to GCS.
@@ -41,7 +35,6 @@ public class GcsStager implements Stager {
     this.options = options;
   }
 
-  @SuppressWarnings("unused")  // used via reflection
   public static GcsStager fromOptions(PipelineOptions options) {
     return new GcsStager(options.as(DataflowPipelineOptions.class));
   }
@@ -55,16 +48,7 @@ public class GcsStager implements Stager {
     if (windmillBinary != null) {
       filesToStage.add("windmill_main=" + windmillBinary);
     }
-    int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
-    checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
-    uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
-    Storage.Builder storageBuilder = Transport.newStorageClient(options);
-    GcsUtil util = GcsUtilFactory.create(
-        storageBuilder.build(),
-        storageBuilder.getHttpRequestInitializer(),
-        options.getExecutorService(),
-        uploadSizeBytes);
     return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(), options.getStagingLocation(), util);
+        options.getFilesToStage(), options.getStagingLocation());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index fa8c94d..6d910ba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -17,62 +17,53 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.fasterxml.jackson.core.Base64Variants;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.collect.Lists;
 import com.google.common.hash.Funnels;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.io.CountingOutputStream;
 import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.ZipFiles;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Helper routines for packages. */
-class PackageUtil {
+public class PackageUtil {
   private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
   /**
    * A reasonable upper bound on the number of jars required to launch a Dataflow job.
    */
-  private static final int SANE_CLASSPATH_SIZE = 1000;
+  public static final int SANE_CLASSPATH_SIZE = 1000;
+  /**
+   * The initial interval to use between package staging attempts.
+   */
+  private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
+  /**
+   * The maximum number of retries when staging a file.
+   */
+  private static final int MAX_RETRIES = 4;
 
   private static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
+      FluentBackoff.DEFAULT
+          .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
 
   /**
    * Translates exceptions from API calls.
@@ -80,18 +71,35 @@ class PackageUtil {
   private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
 
   /**
+   * Creates a DataflowPackage containing information about how a classpath element should be
+   * staged, including the staging destination as well as its size and hash.
+   *
+   * @param classpathElement The local path for the classpath element.
+   * @param stagingPath The base location for staged classpath elements.
+   * @param overridePackageName If non-null, use the given value as the package name
+   *                            instead of generating one automatically.
+   * @return The package.
+   */
+  @Deprecated
+  public static DataflowPackage createPackage(File classpathElement,
+      String stagingPath, String overridePackageName) {
+    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
+        .getDataflowPackage();
+  }
+
+  /**
    * Compute and cache the attributes of a classpath element that we will need to stage it.
    *
-   * @param source the file or directory to be staged.
+   * @param classpathElement the file or directory to be staged.
    * @param stagingPath The base location for staged classpath elements.
    * @param overridePackageName If non-null, use the given value as the package name
    *                            instead of generating one automatically.
    * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
    */
-  static PackageAttributes createPackageAttributes(File source,
-      String stagingPath, @Nullable String overridePackageName) {
+  static PackageAttributes createPackageAttributes(File classpathElement,
+      String stagingPath, String overridePackageName) {
     try {
-      boolean directory = source.isDirectory();
+      boolean directory = classpathElement.isDirectory();
 
       // Compute size and hash in one pass over file or directory.
       Hasher hasher = Hashing.md5().newHasher();
@@ -100,158 +108,25 @@ class PackageUtil {
 
       if (!directory) {
         // Files are staged as-is.
-        Files.asByteSource(source).copyTo(countingOutputStream);
+        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
       } else {
         // Directories are recursively zipped.
-        ZipFiles.zipDirectory(source, countingOutputStream);
+        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
       }
 
       long size = countingOutputStream.getCount();
       String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
 
       // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(source, hash);
+      String uniqueName = getUniqueContentName(classpathElement, hash);
       String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
       DataflowPackage target = new DataflowPackage();
       target.setName(overridePackageName != null ? overridePackageName : uniqueName);
       target.setLocation(resourcePath);
 
-      return new PackageAttributes(size, hash, directory, target, source.getPath());
+      return new PackageAttributes(size, hash, directory, target);
     } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + source, e);
-    }
-  }
-
-  /** Utility comparator used in uploading packages efficiently. */
-  private static class PackageUploadOrder implements Comparator<PackageAttributes> {
-    @Override
-    public int compare(PackageAttributes o1, PackageAttributes o2) {
-      // Smaller size compares high so that bigger packages are uploaded first.
-      long sizeDiff = o2.getSize() - o1.getSize();
-      if (sizeDiff != 0) {
-        // returns sign of long
-        return Long.signum(sizeDiff);
-      }
-
-      // Otherwise, choose arbitrarily based on hash.
-      return o1.getHash().compareTo(o2.getHash());
-    }
-  }
-
-  /**
-   * Utility function that computes sizes and hashes of packages so that we can validate whether
-   * they have already been correctly staged.
-   */
-  private static List<PackageAttributes> computePackageAttributes(
-      Collection<String> classpathElements, final String stagingPath,
-      ListeningExecutorService executorService) {
-    List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
-    for (String classpathElement : classpathElements) {
-      @Nullable String userPackageName = null;
-      if (classpathElement.contains("=")) {
-        String[] components = classpathElement.split("=", 2);
-        userPackageName = components[0];
-        classpathElement = components[1];
-      }
-      @Nullable final String packageName = userPackageName;
-
-      final File file = new File(classpathElement);
-      if (!file.exists()) {
-        LOG.warn("Skipping non-existent classpath element {} that was specified.",
-            classpathElement);
-        continue;
-      }
-
-      ListenableFuture<PackageAttributes> future =
-          executorService.submit(new Callable<PackageAttributes>() {
-            @Override
-            public PackageAttributes call() throws Exception {
-              return createPackageAttributes(file, stagingPath, packageName);
-            }
-          });
-      futures.add(future);
-    }
-
-    try {
-      return Futures.allAsList(futures).get();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException("Interrupted while staging packages", e);
-    } catch (ExecutionException e) {
-      throw new RuntimeException("Error while staging packages", e.getCause());
-    }
-  }
-
-  private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
-      throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(target);
-    if (factory instanceof GcsIOChannelFactory) {
-      return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
-    } else {
-      return factory.create(target, MimeTypes.BINARY);
-    }
-  }
-
-  /**
-   * Utility to verify whether a package has already been staged and, if not, copy it to the
-   * staging location.
-   */
-  private static void stageOnePackage(
-      PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
-      Sleeper retrySleeper, GcsUtil gcsUtil) {
-    String source = attributes.getSourcePath();
-    String target = attributes.getDataflowPackage().getLocation();
-
-    // TODO: Should we attempt to detect the Mime type rather than
-    // always using MimeTypes.BINARY?
-    try {
-      try {
-        long remoteLength = IOChannelUtils.getSizeBytes(target);
-        if (remoteLength == attributes.getSize()) {
-          LOG.debug("Skipping classpath element already staged: {} at {}",
-              attributes.getSourcePath(), target);
-          numCached.incrementAndGet();
-          return;
-        }
-      } catch (FileNotFoundException expected) {
-        // If the file doesn't exist, it means we need to upload it.
-      }
-
-      // Upload file, retrying on failure.
-      BackOff backoff = BACKOFF_FACTORY.backoff();
-      while (true) {
-        try {
-          LOG.debug("Uploading classpath element {} to {}", source, target);
-          try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
-            copyContent(source, writer);
-          }
-          numUploaded.incrementAndGet();
-          break;
-        } catch (IOException e) {
-          if (ERROR_EXTRACTOR.accessDenied(e)) {
-            String errorMessage = String.format(
-                "Uploaded failed due to permissions error, will NOT retry staging "
-                    + "of classpath %s. Please verify credentials are valid and that you have "
-                    + "write access to %s. Stale credentials can be resolved by executing "
-                    + "'gcloud auth application-default login'.", source, target);
-            LOG.error(errorMessage);
-            throw new IOException(errorMessage, e);
-          }
-          long sleep = backoff.nextBackOffMillis();
-          if (sleep == BackOff.STOP) {
-            // Rethrow last error, to be included as a cause in the catch below.
-            LOG.error("Upload failed, will NOT retry staging of classpath: {}",
-                source, e);
-            throw e;
-          } else {
-            LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
-                source, e);
-            retrySleeper.sleep(sleep);
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Could not stage classpath element: " + source, e);
+      throw new RuntimeException("Package setup failure for " + classpathElement, e);
     }
   }
 
@@ -262,70 +137,113 @@ class PackageUtil {
    * @param stagingPath The base location to stage the elements to.
    * @return A list of cloud workflow packages, each representing a classpath element.
    */
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) {
-    ListeningExecutorService executorService =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
-    try {
-      return stageClasspathElements(
-          classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil);
-    } finally {
-      executorService.shutdown();
-    }
+  public static List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements, String stagingPath) {
+    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
   }
 
   // Visible for testing.
   static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, final String stagingPath,
-      final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) {
+      Collection<String> classpathElements, String stagingPath,
+      Sleeper retrySleeper) {
     LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
         + "prepare for execution.", classpathElements.size());
 
     if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
       LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
-            + "copies to all workers. Having this many entries on your classpath may be indicative "
-            + "of an issue in your pipeline. You may want to consider trimming the classpath to "
-            + "necessary dependencies only, using --filesToStage pipeline option to override "
-            + "what files are being staged, or bundling several dependencies into one.",
+          + "copies to all workers. Having this many entries on your classpath may be indicative "
+          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
+          + "necessary dependencies only, using --filesToStage pipeline option to override "
+          + "what files are being staged, or bundling several dependencies into one.",
           classpathElements.size());
     }
 
-    checkArgument(
-        stagingPath != null,
-        "Can't stage classpath elements because no staging location has been provided");
+    ArrayList<DataflowPackage> packages = new ArrayList<>();
 
-    // Inline a copy here because the inner code returns an immutable list and we want to mutate it.
-    List<PackageAttributes> packageAttributes =
-        new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
-    // Order package attributes in descending size order so that we upload the largest files first.
-    Collections.sort(packageAttributes, new PackageUploadOrder());
+    if (stagingPath == null) {
+      throw new IllegalArgumentException(
+          "Can't stage classpath elements on because no staging location has been provided");
+    }
 
-    List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
-    final AtomicInteger numUploaded = new AtomicInteger(0);
-    final AtomicInteger numCached = new AtomicInteger(0);
+    int numUploaded = 0;
+    int numCached = 0;
+    for (String classpathElement : classpathElements) {
+      String packageName = null;
+      if (classpathElement.contains("=")) {
+        String[] components = classpathElement.split("=", 2);
+        packageName = components[0];
+        classpathElement = components[1];
+      }
 
-    List<ListenableFuture<?>> futures = new LinkedList<>();
-    for (final PackageAttributes attributes : packageAttributes) {
-      packages.add(attributes.getDataflowPackage());
-      futures.add(executorService.submit(new Runnable() {
-        @Override
-        public void run() {
-          stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil);
+      File file = new File(classpathElement);
+      if (!file.exists()) {
+        LOG.warn("Skipping non-existent classpath element {} that was specified.",
+            classpathElement);
+        continue;
+      }
+
+      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
+
+      DataflowPackage workflowPackage = attributes.getDataflowPackage();
+      packages.add(workflowPackage);
+      String target = workflowPackage.getLocation();
+
+      // TODO: Should we attempt to detect the Mime type rather than
+      // always using MimeTypes.BINARY?
+      try {
+        try {
+          long remoteLength = IOChannelUtils.getSizeBytes(target);
+          if (remoteLength == attributes.getSize()) {
+            LOG.debug("Skipping classpath element already staged: {} at {}",
+                classpathElement, target);
+            numCached++;
+            continue;
+          }
+        } catch (FileNotFoundException expected) {
+          // If the file doesn't exist, it means we need to upload it.
         }
-      }));
-    }
-    try {
-      Futures.allAsList(futures).get();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException("Interrupted while staging packages", e);
-    } catch (ExecutionException e) {
-      throw new RuntimeException("Error while staging packages", e.getCause());
+
+        // Upload file, retrying on failure.
+        BackOff backoff = BACKOFF_FACTORY.backoff();
+        while (true) {
+          try {
+            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
+            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
+              copyContent(classpathElement, writer);
+            }
+            numUploaded++;
+            break;
+          } catch (IOException e) {
+            if (ERROR_EXTRACTOR.accessDenied(e)) {
+              String errorMessage = String.format(
+                  "Uploaded failed due to permissions error, will NOT retry staging "
+                  + "of classpath %s. Please verify credentials are valid and that you have "
+                  + "write access to %s. Stale credentials can be resolved by executing "
+                  + "'gcloud auth login'.", classpathElement, target);
+              LOG.error(errorMessage);
+              throw new IOException(errorMessage, e);
+            }
+            long sleep = backoff.nextBackOffMillis();
+            if (sleep == BackOff.STOP) {
+              // Rethrow last error, to be included as a cause in the catch below.
+              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
+                  classpathElement, e);
+              throw e;
+            } else {
+              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+                  classpathElement, e);
+              retrySleeper.sleep(sleep);
+            }
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
+      }
     }
 
-    LOG.info(
-        "Staging files complete: {} files cached, {} files newly uploaded",
-        numUploaded.get(), numCached.get());
+    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
+        + "{} files cached",
+        numUploaded, numCached);
 
     return packages;
   }
@@ -375,15 +293,13 @@ class PackageUtil {
     private final boolean directory;
     private final long size;
     private final String hash;
-    private final String sourcePath;
     private DataflowPackage dataflowPackage;
 
     public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage, String sourcePath) {
+        DataflowPackage dataflowPackage) {
       this.size = size;
       this.hash = Objects.requireNonNull(hash, "hash");
       this.directory = directory;
-      this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
       this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
     }
 
@@ -414,12 +330,5 @@ class PackageUtil {
     public String getHash() {
       return hash;
     }
-
-    /**
-     * @return the file to be uploaded
-     */
-    public String getSourcePath() {
-      return sourcePath;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 3828415..05a87dd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -18,12 +18,12 @@
 package org.apache.beam.runners.dataflow.util;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -53,7 +53,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.common.io.LineReader;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -236,7 +235,7 @@ public class PackageUtilTest {
       classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
     }
 
-    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil);
+    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
 
     logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
   }
@@ -251,7 +250,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -278,7 +277,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -305,7 +304,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -328,8 +327,7 @@ public class PackageUtilTest {
     try {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
-          mockGcsUtil);
+          STAGING_PATH, fastNanoClockAndSleeper);
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
@@ -350,20 +348,16 @@ public class PackageUtilTest {
     try {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
-          mockGcsUtil);
+          STAGING_PATH, fastNanoClockAndSleeper);
       fail("Expected RuntimeException");
     } catch (RuntimeException e) {
-      assertThat("Expected RuntimeException wrapping IOException.",
-          e.getCause(), instanceOf(RuntimeException.class));
-      assertThat("Expected IOException containing detailed message.",
-          e.getCause().getCause(), instanceOf(IOException.class));
-      assertThat(e.getCause().getCause().getMessage(),
+      assertTrue("Expected IOException containing detailed message.",
+          e.getCause() instanceof IOException);
+      assertThat(e.getCause().getMessage(),
           Matchers.allOf(
               Matchers.containsString("Uploaded failed due to permissions error"),
               Matchers.containsString(
-                  "Stale credentials can be resolved by executing 'gcloud auth application-default "
-                      + "login'")));
+                  "Stale credentials can be resolved by executing 'gcloud auth login'")));
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -383,8 +377,9 @@ public class PackageUtilTest {
 
     try {
       PackageUtil.stageClasspathElements(
-          ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
-          MoreExecutors.newDirectExecutorService(), mockGcsUtil);
+                                              ImmutableList.of(tmpFile.getAbsolutePath()),
+                                              STAGING_PATH,
+                                              fastNanoClockAndSleeper);
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -398,7 +393,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verifyNoMoreInteractions(mockGcsUtil);
@@ -416,7 +411,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -434,8 +429,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
-        mockGcsUtil);
+        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -452,7 +446,7 @@ public class PackageUtilTest {
     String nonExistentFile =
         IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
     assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
-        ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
+        ImmutableList.of(nonExistentFile), STAGING_PATH));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index 72e106d..0553efc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.AppEngineEnvironment;
 import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -82,9 +81,8 @@ public interface GcsOptions extends
       + "information on the restrictions and performance implications of this value.\n\n"
       + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
       + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
-  @Nullable
   Integer getGcsUploadBufferSizeBytes();
-  void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+  void setGcsUploadBufferSizeBytes(Integer bytes);
 
   /**
    * The class of the validator that should be created and used to validate paths.

http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 5e83584..a10ea28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -101,18 +101,6 @@ public class GcsUtil {
           gcsOptions.getExecutorService(),
           gcsOptions.getGcsUploadBufferSizeBytes());
     }
-
-    /**
-     * Returns an instance of {@link GcsUtil} based on the given parameters.
-     */
-    public static GcsUtil create(
-        Storage storageClient,
-        HttpRequestInitializer httpRequestInitializer,
-        ExecutorService executorService,
-        @Nullable Integer uploadBufferSizeBytes) {
-      return new GcsUtil(
-          storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
-    }
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);