You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/06 00:46:26 UTC
[2/4] beam git commit: Use AutoValue for Dataflow PackageAttributes
Use AutoValue for Dataflow PackageAttributes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a328127b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a328127b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a328127b
Branch: refs/heads/master
Commit: a328127b9b0a0f59816bcbe84646446b4f75aafc
Parents: a211bd9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Sep 28 20:03:51 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 5 17:35:04 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/util/PackageUtil.java | 164 ++++++++-----------
.../runners/dataflow/util/PackageUtilTest.java | 29 ++--
2 files changed, 84 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a328127b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 9d1e084..7496d1c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.Base64Variants;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.collect.Lists;
import com.google.common.hash.Funnels;
@@ -46,7 +47,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -105,49 +105,6 @@ class PackageUtil implements Closeable {
}
- /**
- * Compute and cache the attributes of a classpath element that we will need to stage it.
- *
- * @param source the file or directory to be staged.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
- */
- static PackageAttributes createPackageAttributes(File source,
- String stagingPath, @Nullable String overridePackageName) {
- boolean directory = source.isDirectory();
-
- // Compute size and hash in one pass over file or directory.
- Hasher hasher = Hashing.md5().newHasher();
- OutputStream hashStream = Funnels.asOutputStream(hasher);
- try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) {
- if (!directory) {
- // Files are staged as-is.
- Files.asByteSource(source).copyTo(countingOutputStream);
- } else {
- // Directories are recursively zipped.
- ZipFiles.zipDirectory(source, countingOutputStream);
- }
- countingOutputStream.flush();
-
- long size = countingOutputStream.getCount();
- String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
- // Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(source, hash);
- String resourcePath = FileSystems.matchNewResource(stagingPath, true)
- .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString();
- DataflowPackage target = new DataflowPackage();
- target.setName(overridePackageName != null ? overridePackageName : uniqueName);
- target.setLocation(resourcePath);
-
- return new PackageAttributes(size, hash, directory, target, source.getPath());
- } catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + source, e);
- }
- }
-
/** Utility comparator used in uploading packages efficiently. */
private static class PackageUploadOrder implements Comparator<PackageAttributes> {
@Override
@@ -193,7 +150,11 @@ class PackageUtil implements Closeable {
executorService.submit(new Callable<PackageAttributes>() {
@Override
public PackageAttributes call() throws Exception {
- return createPackageAttributes(file, stagingPath, packageName);
+ PackageAttributes attributes = PackageAttributes.forFileToStage(file, stagingPath);
+ if (packageName != null) {
+ attributes = attributes.withPackageName(packageName);
+ }
+ return attributes;
}
});
futures.add(future);
@@ -221,8 +182,8 @@ class PackageUtil implements Closeable {
private void stageOnePackage(
PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
Sleeper retrySleeper, CreateOptions createOptions) {
- String source = attributes.getSourcePath();
- String target = attributes.getDataflowPackage().getLocation();
+ File source = attributes.getSource();
+ String target = attributes.getDestination().getLocation();
// TODO: Should we attempt to detect the Mime type rather than
// always using MimeTypes.BINARY?
@@ -231,7 +192,7 @@ class PackageUtil implements Closeable {
long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes();
if (remoteLength == attributes.getSize()) {
LOG.debug("Skipping classpath element already staged: {} at {}",
- attributes.getSourcePath(), target);
+ attributes.getSource(), target);
numCached.incrementAndGet();
return;
}
@@ -245,7 +206,7 @@ class PackageUtil implements Closeable {
try {
LOG.debug("Uploading classpath element {} to {}", source, target);
try (WritableByteChannel writer = makeWriter(target, createOptions)) {
- copyContent(source, writer);
+ copyContent(attributes.getSource(), writer);
}
numUploaded.incrementAndGet();
break;
@@ -319,7 +280,7 @@ class PackageUtil implements Closeable {
// same order as on the classpath.
List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
for (final PackageAttributes attributes : packageAttributes) {
- packages.add(attributes.getDataflowPackage());
+ packages.add(attributes.getDestination());
}
// Order package attributes in descending size order so that we upload the largest files first.
@@ -381,67 +342,74 @@ class PackageUtil implements Closeable {
*
* <p>The output channel is not closed.
*/
- private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
+ private static void copyContent(File classpathElement, WritableByteChannel outputChannel)
throws IOException {
- final File classpathElementFile = new File(classpathElement);
- if (classpathElementFile.isDirectory()) {
- ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
+ if (classpathElement.isDirectory()) {
+ ZipFiles.zipDirectory(classpathElement, Channels.newOutputStream(outputChannel));
} else {
- Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
+ Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel));
}
}
/**
* Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
*/
- static class PackageAttributes {
- private final boolean directory;
- private final long size;
- private final String hash;
- private final String sourcePath;
- private DataflowPackage dataflowPackage;
-
- public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage, String sourcePath) {
- this.size = size;
- this.hash = Objects.requireNonNull(hash, "hash");
- this.directory = directory;
- this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
- this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
- }
+ @AutoValue
+ abstract static class PackageAttributes {
+
+ public static PackageAttributes forFileToStage(File source, String stagingPath)
+ throws IOException {
+
+ // Compute size and hash in one pass over file or directory.
+ long size;
+ String hash;
+ Hasher hasher = Hashing.md5().newHasher();
+ OutputStream hashStream = Funnels.asOutputStream(hasher);
+ try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) {
+ if (!source.isDirectory()) {
+ // Files are staged as-is.
+ Files.asByteSource(source).copyTo(countingOutputStream);
+ } else {
+ // Directories are recursively zipped.
+ ZipFiles.zipDirectory(source, countingOutputStream);
+ }
+ countingOutputStream.flush();
- /**
- * @return the dataflowPackage
- */
- public DataflowPackage getDataflowPackage() {
- return dataflowPackage;
- }
+ size = countingOutputStream.getCount();
+ hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+ }
- /**
- * @return the directory
- */
- public boolean isDirectory() {
- return directory;
- }
+ String uniqueName = getUniqueContentName(source, hash);
- /**
- * @return the size
- */
- public long getSize() {
- return size;
- }
+ String resourcePath =
+ FileSystems.matchNewResource(stagingPath, true)
+ .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE)
+ .toString();
+ DataflowPackage target = new DataflowPackage();
+ target.setName(uniqueName);
+ target.setLocation(resourcePath);
- /**
- * @return the hash
- */
- public String getHash() {
- return hash;
+ return new AutoValue_PackageUtil_PackageAttributes(source, target, size, hash);
}
- /**
- * @return the file to be uploaded
- */
- public String getSourcePath() {
- return sourcePath;
+ public PackageAttributes withPackageName(String overridePackageName) {
+ DataflowPackage newDestination = new DataflowPackage();
+ newDestination.setName(overridePackageName);
+ newDestination.setLocation(getDestination().getLocation());
+
+ return new AutoValue_PackageUtil_PackageAttributes(
+ getSource(), newDestination, getSize(), getHash());
}
+
+ /** @return the file to be uploaded */
+ public abstract File getSource();
+
+ /** @return the dataflowPackage */
+ public abstract DataflowPackage getDestination();
+
+ /** @return the size */
+ public abstract long getSize();
+
+ /** @return the hash */
+ public abstract String getHash();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a328127b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index de6416d..0b94f7c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -71,6 +71,7 @@ import java.util.Collections;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
@@ -137,8 +138,14 @@ public class PackageUtilTest {
static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", "base/path/");
static final String STAGING_PATH = STAGING_GCS_PATH.toString();
- private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
- return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+
+ private static PackageAttributes makePackageAttributes(
+ File file, @Nullable String overridePackageName) throws IOException {
+ PackageAttributes attributes = PackageUtil.PackageAttributes.forFileToStage(file, STAGING_PATH);
+ if (overridePackageName != null) {
+ attributes = attributes.withPackageName(overridePackageName);
+ }
+ return attributes;
}
@Test
@@ -146,7 +153,7 @@ public class PackageUtilTest {
String contents = "This is a test!";
File tmpFile = makeFileWithContents("file.txt", contents);
PackageAttributes attr = makePackageAttributes(tmpFile, null);
- DataflowPackage target = attr.getDataflowPackage();
+ DataflowPackage target = attr.getDestination();
assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -156,7 +163,7 @@ public class PackageUtilTest {
@Test
public void testPackageNamingWithFileNoExtension() throws Exception {
File tmpFile = makeFileWithContents("file", "This is a test!");
- DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+ DataflowPackage target = makePackageAttributes(tmpFile, null).getDestination();
assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -165,7 +172,7 @@ public class PackageUtilTest {
@Test
public void testPackageNamingWithDirectory() throws Exception {
File tmpDirectory = tmpFolder.newFolder("folder");
- DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+ DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDestination();
assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -175,11 +182,11 @@ public class PackageUtilTest {
public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
makeFileWithContents("folder1/folderA/sameName", "This is a test!");
- DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
makeFileWithContents("folder2/folderA/sameName", "This is a test!");
- DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
assertEquals(target1.getName(), target2.getName());
assertEquals(target1.getLocation(), target2.getLocation());
@@ -189,11 +196,11 @@ public class PackageUtilTest {
public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
- DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
- DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
assertNotEquals(target1.getName(), target2.getName());
assertNotEquals(target1.getLocation(), target2.getLocation());
@@ -204,11 +211,11 @@ public class PackageUtilTest {
throws Exception {
File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
- DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
- DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
assertNotEquals(target1.getName(), target2.getName());
assertNotEquals(target1.getLocation(), target2.getLocation());