You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/18 20:09:14 UTC

[2/7] beam git commit: Add ability to stage explicit file list

Add ability to stage explicit file list


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b866fef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b866fef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b866fef

Branch: refs/heads/master
Commit: 9b866fef99293d9738f0dcd862fb409265e50abb
Parents: 7409ca0
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 10 21:55:49 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Wed Oct 18 13:02:24 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  2 +-
 .../beam/runners/dataflow/util/GcsStager.java   | 42 +++++++++++++++-----
 .../beam/runners/dataflow/util/Stager.java      | 27 ++++++++++---
 3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e637dd4..5e91850 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -514,7 +514,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
         + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    List<DataflowPackage> packages = options.getStager().stageFiles();
+    List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
 
 
     // Set a unique client_request_id in the CreateJob request.

http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 929be99..ff205f0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -29,9 +29,7 @@ import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.MimeTypes;
 
-/**
- * Utility class for staging files to GCS.
- */
+/** Utility class for staging files to GCS. */
 public class GcsStager implements Stager {
   private DataflowPipelineOptions options;
 
@@ -39,32 +37,54 @@ public class GcsStager implements Stager {
     this.options = options;
   }
 
-  @SuppressWarnings("unused")  // used via reflection
+  @SuppressWarnings("unused") // used via reflection
   public static GcsStager fromOptions(PipelineOptions options) {
     return new GcsStager(options.as(DataflowPipelineOptions.class));
   }
 
+  /**
+   * Stages {@link DataflowPipelineOptions#getFilesToStage()}, which defaults to every file on the
+   * classpath unless overridden, as well as {@link
+   * DataflowPipelineDebugOptions#getOverrideWindmillBinary()} if specified.
+   *
+   * @see #stageFiles(List)
+   */
   @Override
-  public List<DataflowPackage> stageFiles() {
+  public List<DataflowPackage> stageDefaultFiles() {
     checkNotNull(options.getStagingLocation());
     String windmillBinary =
         options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+
+    List<String> filesToStage = options.getFilesToStage();
+
     if (windmillBinary != null) {
-      options.getFilesToStage().add("windmill_main=" + windmillBinary);
+      filesToStage.add("windmill_main=" + windmillBinary);
     }
 
+    return stageFiles(filesToStage);
+  }
+
+  /**
+   * Stages files to {@link DataflowPipelineOptions#getStagingLocation()}, suffixed with their md5
+   * hash to avoid collisions.
+   *
+   * <p>Uses {@link DataflowPipelineOptions#getGcsUploadBufferSizeBytes()}.
+   */
+  @Override
+  public List<DataflowPackage> stageFiles(List<String> filesToStage) {
     int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
     checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
     uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
 
-    GcsCreateOptions createOptions = GcsCreateOptions.builder()
-        .setGcsUploadBufferSizeBytes(uploadSizeBytes)
-        .setMimeType(MimeTypes.BINARY)
-        .build();
+    GcsCreateOptions createOptions =
+        GcsCreateOptions.builder()
+            .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+            .setMimeType(MimeTypes.BINARY)
+            .build();
 
     try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
       return packageUtil.stageClasspathElements(
-          options.getFilesToStage(), options.getStagingLocation(), createOptions);
+          filesToStage, options.getStagingLocation(), createOptions);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
index 3e3c17f..f0be941 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
@@ -20,10 +20,27 @@ package org.apache.beam.runners.dataflow.util;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import java.util.List;
 
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
+/** Interface for staging files needed for running a Dataflow pipeline. */
 public interface Stager {
-  /* Stage files and return a list of packages. */
-  List<DataflowPackage> stageFiles();
+  /**
+   * Stage default files and return a list of {@link DataflowPackage} objects describing the actual
+   * location at which each file was staged.
+   *
+   * <p>This is required to be identical to calling {@link #stageFiles(List)} with the default set
+   * of files.
+   *
+   * <p>The default is controlled by the implementation of {@link Stager}. The only known
+   * implementation of stager is {@link GcsStager}. See that class for more detail.
+   */
+  List<DataflowPackage> stageDefaultFiles();
+
+  /**
+   * Stage files and return a list of packages {@link DataflowPackage} objects describing th actual
+   * location at which each file was staged.
+   *
+   * <p>The mechanism for staging is owned by the implementation. The only requirement is that the
+   * location specified in the returned {@link DataflowPackage} should, in fact, contain the
+   * contents of the staged file.
+   */
+  List<DataflowPackage> stageFiles(List<String> filesToStage);
 }