You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/18 20:09:17 UTC
[5/7] beam git commit: Stage the pipeline without using a temp file
Stage the pipeline without using a temp file
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/090c5124
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/090c5124
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/090c5124
Branch: refs/heads/master
Commit: 090c512457e25c965efab2d6c849f1a50e03e052
Parents: aea0c60
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 16:06:05 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Wed Oct 18 13:02:25 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 22 +---
.../beam/runners/dataflow/util/GcsStager.java | 29 +++--
.../beam/runners/dataflow/util/PackageUtil.java | 116 ++++++++++++++-----
.../beam/runners/dataflow/util/Stager.java | 5 +
4 files changed, 111 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6dbc4af..ecef072 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -41,7 +41,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URISyntaxException;
@@ -191,10 +190,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024;
@VisibleForTesting
- static final String PIPELINE_FILE_NAME = "pipeline";
-
- @VisibleForTesting
- static final String SERIALIZED_PROTOBUF_EXTENSION = ".pb";
+ static final String PIPELINE_FILE_NAME = "pipeline.pb";
private static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url";
@@ -526,22 +522,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
- RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline);
- File serializedProtoPipeline;
- try {
- serializedProtoPipeline =
- File.createTempFile(PIPELINE_FILE_NAME, SERIALIZED_PROTOBUF_EXTENSION);
- protoPipeline.writeDelimitedTo(new FileOutputStream(serializedProtoPipeline));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
+ byte[] serializedProtoPipeline = PipelineTranslation.toProto(pipeline).toByteArray();
LOG.info("Staging pipeline description to {}", options.getStagingLocation());
DataflowPackage stagedPipeline =
- options
- .getStager()
- .stageFiles(ImmutableList.of(serializedProtoPipeline.getAbsolutePath()))
- .get(0);
+ options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index ff205f0..7ed78e8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -72,19 +72,28 @@ public class GcsStager implements Stager {
*/
@Override
public List<DataflowPackage> stageFiles(List<String> filesToStage) {
+ try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+ return packageUtil.stageClasspathElements(
+ filesToStage, options.getStagingLocation(), buildCreateOptions());
+ }
+ }
+
+ @Override
+ public DataflowPackage stageToFile(byte[] bytes, String baseName) {
+ try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+ return packageUtil.stageToFile(
+ bytes, baseName, options.getStagingLocation(), buildCreateOptions());
+ }
+ }
+
+ private GcsCreateOptions buildCreateOptions() {
int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
- GcsCreateOptions createOptions =
- GcsCreateOptions.builder()
- .setGcsUploadBufferSizeBytes(uploadSizeBytes)
- .setMimeType(MimeTypes.BINARY)
- .build();
-
- try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
- return packageUtil.stageClasspathElements(
- filesToStage, options.getStagingLocation(), createOptions);
- }
+ return GcsCreateOptions.builder()
+ .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+ .setMimeType(MimeTypes.BINARY)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 449b36d..565e965 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.util;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.core.Base64Variants;
import com.google.api.client.util.BackOff;
@@ -29,6 +30,7 @@ import com.google.common.base.Function;
import com.google.common.hash.Funnels;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+import com.google.common.io.ByteSource;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AsyncFunction;
@@ -51,6 +53,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
import org.apache.beam.sdk.io.FileSystems;
@@ -182,11 +185,11 @@ class PackageUtil implements Closeable {
private StagingResult stagePackageSynchronously(
PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions)
throws IOException, InterruptedException {
- File source = attributes.getSource();
+ String sourceDescription = attributes.getSourceDescription();
String target = attributes.getDestination().getLocation();
if (alreadyStaged(attributes)) {
- LOG.debug("Skipping file already staged: {} at {}", source, target);
+ LOG.debug("Skipping file already staged: {} at {}", sourceDescription, target);
return StagingResult.cached(attributes);
}
@@ -194,14 +197,14 @@ class PackageUtil implements Closeable {
return tryStagePackageWithRetry(attributes, retrySleeper, createOptions);
} catch (Exception miscException) {
throw new RuntimeException(
- String.format("Could not stage %s to %s", source, target), miscException);
+ String.format("Could not stage %s to %s", sourceDescription, target), miscException);
}
}
private StagingResult tryStagePackageWithRetry(
PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions)
throws IOException, InterruptedException {
- File source = attributes.getSource();
+ String sourceDescription = attributes.getSourceDescription();
String target = attributes.getDestination().getLocation();
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
@@ -217,19 +220,22 @@ class PackageUtil implements Closeable {
+ "of %s. Please verify credentials are valid and that you have "
+ "write access to %s. Stale credentials can be resolved by executing "
+ "'gcloud auth application-default login'.",
- source, target);
+ sourceDescription, target);
LOG.error(errorMessage);
throw new IOException(errorMessage, ioException);
}
long sleep = backoff.nextBackOffMillis();
if (sleep == BackOff.STOP) {
- LOG.error("Upload failed, will NOT retry staging of package: {}", source, ioException);
+ LOG.error(
+ "Upload failed, will NOT retry staging of package: {}",
+ sourceDescription,
+ ioException);
throw new RuntimeException("Could not stage %s to %s", ioException);
} else {
LOG.warn(
"Upload attempt failed, sleeping before retrying staging of package: {}",
- source,
+ sourceDescription,
ioException);
retrySleeper.sleep(sleep);
}
@@ -237,16 +243,29 @@ class PackageUtil implements Closeable {
}
}
- private StagingResult tryStagePackage(
- PackageAttributes attributes, CreateOptions createOptions)
+ private StagingResult tryStagePackage(PackageAttributes attributes, CreateOptions createOptions)
throws IOException, InterruptedException {
- File source = attributes.getSource();
+ String sourceDescription = attributes.getSourceDescription();
String target = attributes.getDestination().getLocation();
- LOG.info("Uploading {} to {}", source, target);
+ LOG.info("Uploading {} to {}", sourceDescription, target);
try (WritableByteChannel writer =
FileSystems.create(FileSystems.matchNewResource(target, false), createOptions)) {
- copyContent(attributes.getSource(), writer);
+ if (attributes.getBytes() != null) {
+ ByteSource.wrap(attributes.getBytes()).copyTo(Channels.newOutputStream(writer));
+ } else {
+ File sourceFile = attributes.getSource();
+ checkState(
+ sourceFile != null,
+ "Internal inconsistency: we tried to stage something to %s, but neither a source file "
+ + "nor the byte content was specified",
+ target);
+ if (sourceFile.isDirectory()) {
+ ZipFiles.zipDirectory(sourceFile, Channels.newOutputStream(writer));
+ } else {
+ Files.asByteSource(sourceFile).copyTo(Channels.newOutputStream(writer));
+ }
+ }
}
return StagingResult.uploaded(attributes);
}
@@ -272,6 +291,24 @@ class PackageUtil implements Closeable {
classpathElements, stagingPath, DEFAULT_SLEEPER, DEFAULT_CREATE_OPTIONS);
}
+ public DataflowPackage stageToFile(
+ byte[] bytes, String target, String stagingPath, CreateOptions createOptions) {
+ try {
+ return stagePackage(
+ PackageAttributes.forBytesToStage(bytes, target, stagingPath),
+ DEFAULT_SLEEPER,
+ createOptions)
+ .get()
+ .getPackageAttributes()
+ .getDestination();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while staging pipeline", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Error while staging pipeline", e.getCause());
+ }
+ }
+
/**
* Transfers the classpath elements to the staging location.
*
@@ -386,23 +423,6 @@ class PackageUtil implements Closeable {
return fileName + "-" + contentHash + "." + fileExtension;
}
- /**
- * Copies the contents of the classpathElement to the output channel.
- *
- * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
- * otherwise the file contents are copied as-is.
- *
- * <p>The output channel is not closed.
- */
- private static void copyContent(File classpathElement, WritableByteChannel outputChannel)
- throws IOException {
- if (classpathElement.isDirectory()) {
- ZipFiles.zipDirectory(classpathElement, Channels.newOutputStream(outputChannel));
- } else {
- Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel));
- }
- }
-
@AutoValue
abstract static class StagingResult {
abstract PackageAttributes getPackageAttributes();
@@ -456,7 +476,26 @@ class PackageUtil implements Closeable {
target.setName(uniqueName);
target.setLocation(resourcePath);
- return new AutoValue_PackageUtil_PackageAttributes(source, target, size, hash);
+ return new AutoValue_PackageUtil_PackageAttributes(source, null, target, size, hash);
+ }
+
+ public static PackageAttributes forBytesToStage(
+ byte[] bytes, String targetName, String stagingPath) {
+ Hasher hasher = Hashing.md5().newHasher();
+ String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.putBytes(bytes).hash().asBytes());
+ long size = bytes.length;
+
+ String uniqueName = getUniqueContentName(new File(targetName), hash);
+
+ String resourcePath =
+ FileSystems.matchNewResource(stagingPath, true)
+ .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE)
+ .toString();
+ DataflowPackage target = new DataflowPackage();
+ target.setName(uniqueName);
+ target.setLocation(resourcePath);
+
+ return new AutoValue_PackageUtil_PackageAttributes(null, bytes, target, size, hash);
}
public PackageAttributes withPackageName(String overridePackageName) {
@@ -465,12 +504,17 @@ class PackageUtil implements Closeable {
newDestination.setLocation(getDestination().getLocation());
return new AutoValue_PackageUtil_PackageAttributes(
- getSource(), newDestination, getSize(), getHash());
+ getSource(), getBytes(), newDestination, getSize(), getHash());
}
- /** @return the file to be uploaded */
+ /** @return the file to be uploaded, if any */
+ @Nullable
public abstract File getSource();
+ /** @return the bytes to be uploaded, if any */
+ @Nullable
+ public abstract byte[] getBytes();
+
/** @return the dataflowPackage */
public abstract DataflowPackage getDestination();
@@ -479,5 +523,13 @@ class PackageUtil implements Closeable {
/** @return the hash */
public abstract String getHash();
+
+ public String getSourceDescription() {
+ if (getSource() != null) {
+ return getSource().toString();
+ } else {
+ return String.format("<%s bytes, hash %s>", getSize(), getHash());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
index f0be941..0b2013e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
@@ -43,4 +43,9 @@ public interface Stager {
* contents of the staged file.
*/
List<DataflowPackage> stageFiles(List<String> filesToStage);
+
+ /**
+ * Stage bytes to a target file name wherever this stager stages things.
+ */
+ DataflowPackage stageToFile(byte[] bytes, String baseName);
}