You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/18 20:09:14 UTC
[2/7] beam git commit: Add ability to stage explicit file list
Add ability to stage explicit file list
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b866fef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b866fef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b866fef
Branch: refs/heads/master
Commit: 9b866fef99293d9738f0dcd862fb409265e50abb
Parents: 7409ca0
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 10 21:55:49 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Wed Oct 18 13:02:24 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../beam/runners/dataflow/util/GcsStager.java | 42 +++++++++++++++-----
.../beam/runners/dataflow/util/Stager.java | 27 ++++++++++---
3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e637dd4..5e91850 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -514,7 +514,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
+ "related to Google Compute Engine usage and other Google Cloud Services.");
- List<DataflowPackage> packages = options.getStager().stageFiles();
+ List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
// Set a unique client_request_id in the CreateJob request.
http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 929be99..ff205f0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -29,9 +29,7 @@ import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.MimeTypes;
-/**
- * Utility class for staging files to GCS.
- */
+/** Utility class for staging files to GCS. */
public class GcsStager implements Stager {
private DataflowPipelineOptions options;
@@ -39,32 +37,54 @@ public class GcsStager implements Stager {
this.options = options;
}
- @SuppressWarnings("unused") // used via reflection
+ @SuppressWarnings("unused") // used via reflection
public static GcsStager fromOptions(PipelineOptions options) {
return new GcsStager(options.as(DataflowPipelineOptions.class));
}
+ /**
+ * Stages {@link DataflowPipelineOptions#getFilesToStage()}, which defaults to every file on the
+ * classpath unless overridden, as well as {@link
+ * DataflowPipelineDebugOptions#getOverrideWindmillBinary()} if specified.
+ *
+ * @see #stageFiles(List)
+ */
@Override
- public List<DataflowPackage> stageFiles() {
+ public List<DataflowPackage> stageDefaultFiles() {
checkNotNull(options.getStagingLocation());
String windmillBinary =
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+
+ List<String> filesToStage = options.getFilesToStage();
+
if (windmillBinary != null) {
- options.getFilesToStage().add("windmill_main=" + windmillBinary);
+ filesToStage.add("windmill_main=" + windmillBinary);
}
+ return stageFiles(filesToStage);
+ }
+
+ /**
+ * Stages files to {@link DataflowPipelineOptions#getStagingLocation()}, suffixed with their md5
+ * hash to avoid collisions.
+ *
+ * <p>Uses {@link DataflowPipelineOptions#getGcsUploadBufferSizeBytes()}.
+ */
+ @Override
+ public List<DataflowPackage> stageFiles(List<String> filesToStage) {
int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
- GcsCreateOptions createOptions = GcsCreateOptions.builder()
- .setGcsUploadBufferSizeBytes(uploadSizeBytes)
- .setMimeType(MimeTypes.BINARY)
- .build();
+ GcsCreateOptions createOptions =
+ GcsCreateOptions.builder()
+ .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+ .setMimeType(MimeTypes.BINARY)
+ .build();
try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
return packageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation(), createOptions);
+ filesToStage, options.getStagingLocation(), createOptions);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
index 3e3c17f..f0be941 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
@@ -20,10 +20,27 @@ package org.apache.beam.runners.dataflow.util;
import com.google.api.services.dataflow.model.DataflowPackage;
import java.util.List;
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
+/** Interface for staging files needed for running a Dataflow pipeline. */
public interface Stager {
- /* Stage files and return a list of packages. */
- List<DataflowPackage> stageFiles();
+ /**
+ * Stage default files and return a list of {@link DataflowPackage} objects describing the actual
+ * location at which each file was staged.
+ *
+ * <p>This is required to be identical to calling {@link #stageFiles(List)} with the default set
+ * of files.
+ *
+ * <p>The default is controlled by the implementation of {@link Stager}. The only known
+ * implementation of stager is {@link GcsStager}. See that class for more detail.
+ */
+ List<DataflowPackage> stageDefaultFiles();
+
+ /**
+ * Stage files and return a list of packages {@link DataflowPackage} objects describing th actual
+ * location at which each file was staged.
+ *
+ * <p>The mechanism for staging is owned by the implementation. The only requirement is that the
+ * location specified in the returned {@link DataflowPackage} should, in fact, contain the
+ * contents of the staged file.
+ */
+ List<DataflowPackage> stageFiles(List<String> filesToStage);
}