You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/06 00:46:26 UTC

[2/4] beam git commit: Use AutoValue for Dataflow PackageAttributes

Use AutoValue for Dataflow PackageAttributes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a328127b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a328127b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a328127b

Branch: refs/heads/master
Commit: a328127b9b0a0f59816bcbe84646446b4f75aafc
Parents: a211bd9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Sep 28 20:03:51 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 5 17:35:04 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/PackageUtil.java | 164 ++++++++-----------
 .../runners/dataflow/util/PackageUtilTest.java  |  29 ++--
 2 files changed, 84 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a328127b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 9d1e084..7496d1c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.Base64Variants;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Funnels;
@@ -46,7 +47,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -105,49 +105,6 @@ class PackageUtil implements Closeable {
   }
 
 
-  /**
-   * Compute and cache the attributes of a classpath element that we will need to stage it.
-   *
-   * @param source the file or directory to be staged.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
-   */
-  static PackageAttributes createPackageAttributes(File source,
-      String stagingPath, @Nullable String overridePackageName) {
-    boolean directory = source.isDirectory();
-
-    // Compute size and hash in one pass over file or directory.
-    Hasher hasher = Hashing.md5().newHasher();
-    OutputStream hashStream = Funnels.asOutputStream(hasher);
-    try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) {
-      if (!directory) {
-        // Files are staged as-is.
-        Files.asByteSource(source).copyTo(countingOutputStream);
-      } else {
-        // Directories are recursively zipped.
-        ZipFiles.zipDirectory(source, countingOutputStream);
-      }
-      countingOutputStream.flush();
-
-      long size = countingOutputStream.getCount();
-      String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
-      // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(source, hash);
-      String resourcePath = FileSystems.matchNewResource(stagingPath, true)
-          .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString();
-      DataflowPackage target = new DataflowPackage();
-      target.setName(overridePackageName != null ? overridePackageName : uniqueName);
-      target.setLocation(resourcePath);
-
-      return new PackageAttributes(size, hash, directory, target, source.getPath());
-    } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + source, e);
-    }
-  }
-
   /** Utility comparator used in uploading packages efficiently. */
   private static class PackageUploadOrder implements Comparator<PackageAttributes> {
     @Override
@@ -193,7 +150,11 @@ class PackageUtil implements Closeable {
           executorService.submit(new Callable<PackageAttributes>() {
             @Override
             public PackageAttributes call() throws Exception {
-              return createPackageAttributes(file, stagingPath, packageName);
+              PackageAttributes attributes = PackageAttributes.forFileToStage(file, stagingPath);
+              if (packageName != null) {
+                attributes = attributes.withPackageName(packageName);
+              }
+              return attributes;
             }
           });
       futures.add(future);
@@ -221,8 +182,8 @@ class PackageUtil implements Closeable {
   private void stageOnePackage(
       PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
       Sleeper retrySleeper, CreateOptions createOptions) {
-    String source = attributes.getSourcePath();
-    String target = attributes.getDataflowPackage().getLocation();
+    File source = attributes.getSource();
+    String target = attributes.getDestination().getLocation();
 
     // TODO: Should we attempt to detect the Mime type rather than
     // always using MimeTypes.BINARY?
@@ -231,7 +192,7 @@ class PackageUtil implements Closeable {
         long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes();
         if (remoteLength == attributes.getSize()) {
           LOG.debug("Skipping classpath element already staged: {} at {}",
-              attributes.getSourcePath(), target);
+              attributes.getSource(), target);
           numCached.incrementAndGet();
           return;
         }
@@ -245,7 +206,7 @@ class PackageUtil implements Closeable {
         try {
           LOG.debug("Uploading classpath element {} to {}", source, target);
           try (WritableByteChannel writer = makeWriter(target, createOptions)) {
-            copyContent(source, writer);
+            copyContent(attributes.getSource(), writer);
           }
           numUploaded.incrementAndGet();
           break;
@@ -319,7 +280,7 @@ class PackageUtil implements Closeable {
     // same order as on the classpath.
     List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
     for (final PackageAttributes attributes : packageAttributes) {
-      packages.add(attributes.getDataflowPackage());
+      packages.add(attributes.getDestination());
     }
 
     // Order package attributes in descending size order so that we upload the largest files first.
@@ -381,67 +342,74 @@ class PackageUtil implements Closeable {
    *
    * <p>The output channel is not closed.
    */
-  private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
+  private static void copyContent(File classpathElement, WritableByteChannel outputChannel)
       throws IOException {
-    final File classpathElementFile = new File(classpathElement);
-    if (classpathElementFile.isDirectory()) {
-      ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
+    if (classpathElement.isDirectory()) {
+      ZipFiles.zipDirectory(classpathElement, Channels.newOutputStream(outputChannel));
     } else {
-      Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
+      Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel));
     }
   }
   /**
    * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
    */
-  static class PackageAttributes {
-    private final boolean directory;
-    private final long size;
-    private final String hash;
-    private final String sourcePath;
-    private DataflowPackage dataflowPackage;
-
-    public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage, String sourcePath) {
-      this.size = size;
-      this.hash = Objects.requireNonNull(hash, "hash");
-      this.directory = directory;
-      this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
-      this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
-    }
+  @AutoValue
+  abstract static class PackageAttributes {
+
+    public static PackageAttributes forFileToStage(File source, String stagingPath)
+        throws IOException {
+
+      // Compute size and hash in one pass over file or directory.
+      long size;
+      String hash;
+      Hasher hasher = Hashing.md5().newHasher();
+      OutputStream hashStream = Funnels.asOutputStream(hasher);
+      try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) {
+        if (!source.isDirectory()) {
+          // Files are staged as-is.
+          Files.asByteSource(source).copyTo(countingOutputStream);
+        } else {
+          // Directories are recursively zipped.
+          ZipFiles.zipDirectory(source, countingOutputStream);
+        }
+        countingOutputStream.flush();
 
-    /**
-     * @return the dataflowPackage
-     */
-    public DataflowPackage getDataflowPackage() {
-      return dataflowPackage;
-    }
+        size = countingOutputStream.getCount();
+        hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+      }
 
-    /**
-     * @return the directory
-     */
-    public boolean isDirectory() {
-      return directory;
-    }
+      String uniqueName = getUniqueContentName(source, hash);
 
-    /**
-     * @return the size
-     */
-    public long getSize() {
-      return size;
-    }
+      String resourcePath =
+          FileSystems.matchNewResource(stagingPath, true)
+              .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE)
+              .toString();
+      DataflowPackage target = new DataflowPackage();
+      target.setName(uniqueName);
+      target.setLocation(resourcePath);
 
-    /**
-     * @return the hash
-     */
-    public String getHash() {
-      return hash;
+      return new AutoValue_PackageUtil_PackageAttributes(source, target, size, hash);
     }
 
-    /**
-     * @return the file to be uploaded
-     */
-    public String getSourcePath() {
-      return sourcePath;
+    public PackageAttributes withPackageName(String overridePackageName) {
+      DataflowPackage newDestination = new DataflowPackage();
+      newDestination.setName(overridePackageName);
+      newDestination.setLocation(getDestination().getLocation());
+
+      return new AutoValue_PackageUtil_PackageAttributes(
+          getSource(), newDestination, getSize(), getHash());
     }
+
+    /** @return the file to be uploaded */
+    public abstract File getSource();
+
+    /** @return the dataflowPackage */
+    public abstract DataflowPackage getDestination();
+
+    /** @return the size */
+    public abstract long getSize();
+
+    /** @return the hash */
+    public abstract String getHash();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a328127b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index de6416d..0b94f7c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -71,6 +71,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.io.FileSystems;
@@ -137,8 +138,14 @@ public class PackageUtilTest {
 
   static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", "base/path/");
   static final String STAGING_PATH = STAGING_GCS_PATH.toString();
-  private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
-    return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+
+  private static PackageAttributes makePackageAttributes(
+      File file, @Nullable String overridePackageName) throws IOException {
+    PackageAttributes attributes = PackageUtil.PackageAttributes.forFileToStage(file, STAGING_PATH);
+    if (overridePackageName != null) {
+      attributes = attributes.withPackageName(overridePackageName);
+    }
+    return attributes;
   }
 
   @Test
@@ -146,7 +153,7 @@ public class PackageUtilTest {
     String contents = "This is a test!";
     File tmpFile = makeFileWithContents("file.txt", contents);
     PackageAttributes attr = makePackageAttributes(tmpFile, null);
-    DataflowPackage target = attr.getDataflowPackage();
+    DataflowPackage target = attr.getDestination();
 
     assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
     assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -156,7 +163,7 @@ public class PackageUtilTest {
   @Test
   public void testPackageNamingWithFileNoExtension() throws Exception {
     File tmpFile = makeFileWithContents("file", "This is a test!");
-    DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+    DataflowPackage target = makePackageAttributes(tmpFile, null).getDestination();
 
     assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
     assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -165,7 +172,7 @@ public class PackageUtilTest {
   @Test
   public void testPackageNamingWithDirectory() throws Exception {
     File tmpDirectory = tmpFolder.newFolder("folder");
-    DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+    DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDestination();
 
     assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
     assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
@@ -175,11 +182,11 @@ public class PackageUtilTest {
   public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
     File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
     makeFileWithContents("folder1/folderA/sameName", "This is a test!");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
 
     File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
     makeFileWithContents("folder2/folderA/sameName", "This is a test!");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
 
     assertEquals(target1.getName(), target2.getName());
     assertEquals(target1.getLocation(), target2.getLocation());
@@ -189,11 +196,11 @@ public class PackageUtilTest {
   public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
     File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
     makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
 
     File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
     makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
 
     assertNotEquals(target1.getName(), target2.getName());
     assertNotEquals(target1.getLocation(), target2.getLocation());
@@ -204,11 +211,11 @@ public class PackageUtilTest {
       throws Exception {
     File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
     tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination();
 
     File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
     tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination();
 
     assertNotEquals(target1.getName(), target2.getName());
     assertNotEquals(target1.getLocation(), target2.getLocation());