You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/06 00:46:27 UTC

[3/4] beam git commit: Make PackageUtil a proper class encapsulating its ExecutorService

Make PackageUtil a proper class encapsulating its ExecutorService


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a211bd9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a211bd9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a211bd9b

Branch: refs/heads/master
Commit: a211bd9bb6365f1fe76e9b16355f721fcaa80b47
Parents: 31da49c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Sep 28 19:35:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 5 17:35:04 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/GcsStager.java   |  8 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 59 ++++++++----
 .../runners/dataflow/util/PackageUtilTest.java  | 95 ++++++++++++--------
 3 files changed, 103 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index d18e306..929be99 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -62,9 +62,9 @@ public class GcsStager implements Stager {
         .setMimeType(MimeTypes.BINARY)
         .build();
 
-    return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(),
-        options.getStagingLocation(),
-        createOptions);
+    try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+      return packageUtil.stageClasspathElements(
+          options.getFilesToStage(), options.getStagingLocation(), createOptions);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 931f7ea..9d1e084 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -62,13 +64,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Helper routines for packages. */
-class PackageUtil {
+@Internal
+class PackageUtil implements Closeable {
+
   private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
+
   /**
    * A reasonable upper bound on the number of jars required to launch a Dataflow job.
    */
   private static final int SANE_CLASSPATH_SIZE = 1000;
 
+  private static final int DEFAULT_THREAD_POOL_SIZE = 32;
+
   private static final FluentBackoff BACKOFF_FACTORY =
       FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
 
@@ -77,6 +84,27 @@ class PackageUtil {
    */
   private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
 
+  private final ListeningExecutorService executorService;
+
+  private PackageUtil(ListeningExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  public static PackageUtil withDefaultThreadPool() {
+    return PackageUtil.withExecutorService(
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)));
+  }
+
+  public static PackageUtil withExecutorService(ListeningExecutorService executorService) {
+    return new PackageUtil(executorService);
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+  }
+
+
   /**
    * Compute and cache the attributes of a classpath element that we will need to stage it.
    *
@@ -140,9 +168,10 @@ class PackageUtil {
    * Utility function that computes sizes and hashes of packages so that we can validate whether
    * they have already been correctly staged.
    */
-  private static List<PackageAttributes> computePackageAttributes(
-      Collection<String> classpathElements, final String stagingPath,
-      ListeningExecutorService executorService) {
+  private List<PackageAttributes> computePackageAttributes(
+      Collection<String> classpathElements,
+      final String stagingPath) {
+
     List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
     for (String classpathElement : classpathElements) {
       @Nullable String userPackageName = null;
@@ -189,7 +218,7 @@ class PackageUtil {
    * Utility to verify whether a package has already been staged and, if not, copy it to the
    * staging location.
    */
-  private static void stageOnePackage(
+  private void stageOnePackage(
       PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
       Sleeper retrySleeper, CreateOptions createOptions) {
     String source = attributes.getSourcePath();
@@ -255,22 +284,16 @@ class PackageUtil {
    * @param stagingPath The base location to stage the elements to.
    * @return A list of cloud workflow packages, each representing a classpath element.
    */
-  static List<DataflowPackage> stageClasspathElements(
+  List<DataflowPackage> stageClasspathElements(
       Collection<String> classpathElements, String stagingPath, CreateOptions createOptions) {
-    ListeningExecutorService executorService =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
-    try {
-      return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT,
-          executorService, createOptions);
-    } finally {
-      executorService.shutdown();
-    }
+    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, createOptions);
   }
 
   // Visible for testing.
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, final String stagingPath,
-      final Sleeper retrySleeper, ListeningExecutorService executorService,
+  List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements,
+      final String stagingPath,
+      final Sleeper retrySleeper,
       final CreateOptions createOptions) {
     LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
         + "prepare for execution.", classpathElements.size());
@@ -290,7 +313,7 @@ class PackageUtil {
 
     // Inline a copy here because the inner code returns an immutable list and we want to mutate it.
     List<PackageAttributes> packageAttributes =
-        new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
+        new LinkedList<>(computePackageAttributes(classpathElements, stagingPath));
 
     // Compute the returned list of DataflowPackage objects here so that they are returned in the
     // same order as on the classpath.

http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 5d0c0f2..de6416d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -86,6 +86,7 @@ import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.hamcrest.Matchers;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -97,22 +98,19 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-/** Tests for PackageUtil. */
+/** Tests for {@link PackageUtil}. */
 @RunWith(JUnit4.class)
 public class PackageUtilTest {
   @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
 
-  @Rule
-  public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
-
-  @Mock
-  GcsUtil mockGcsUtil;
+  @Mock GcsUtil mockGcsUtil;
 
   // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
   private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
   private CreateOptions createOptions;
+  private PackageUtil defaultPackageUtil;
 
   @Before
   public void setUp() {
@@ -122,6 +120,12 @@ public class PackageUtilTest {
     pipelineOptions.setGcsUtil(mockGcsUtil);
     FileSystems.setDefaultPipelineOptions(pipelineOptions);
     createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
+    defaultPackageUtil = PackageUtil.withDefaultThreadPool();
+  }
+
+  @After
+  public void teardown() {
+    defaultPackageUtil.close();
   }
 
   private File makeFileWithContents(String name, String contents) throws Exception {
@@ -224,7 +228,7 @@ public class PackageUtilTest {
       classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
     }
 
-    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions);
+    defaultPackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions);
     logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
   }
 
@@ -239,8 +243,9 @@ public class PackageUtilTest {
 
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
+    List<DataflowPackage> targets =
+        defaultPackageUtil.stageClasspathElements(
+            ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -269,9 +274,11 @@ public class PackageUtilTest {
           }
         });
 
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
-        STAGING_PATH, createOptions);
+    List<DataflowPackage> targets =
+        defaultPackageUtil.stageClasspathElements(
+            ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
+            STAGING_PATH,
+            createOptions);
     // Verify that the packages are returned small, then large, matching input order even though
     // the large file would be uploaded first.
     assertThat(targets.get(0).getName(), startsWith("small"));
@@ -292,7 +299,7 @@ public class PackageUtilTest {
             new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
-    PackageUtil.stageClasspathElements(
+    defaultPackageUtil.stageClasspathElements(
         ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -320,8 +327,9 @@ public class PackageUtilTest {
             new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
+    List<DataflowPackage> targets =
+        defaultPackageUtil.stageClasspathElements(
+            ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -342,10 +350,12 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .thenThrow(new IOException("Fake Exception: Upload error"));
 
-    try {
-      PackageUtil.stageClasspathElements(
+    try (PackageUtil directPackageUtil =
+        PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+      directPackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+          STAGING_PATH,
+          fastNanoClockAndSleeper,
           createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -365,10 +375,12 @@ public class PackageUtilTest {
             googleJsonResponseException(
                 HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
 
-    try {
-      PackageUtil.stageClasspathElements(
+    try (PackageUtil directPackageUtil =
+        PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+      directPackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+          STAGING_PATH,
+          fastNanoClockAndSleeper,
           createOptions);
       fail("Expected RuntimeException");
     } catch (RuntimeException e) {
@@ -400,10 +412,13 @@ public class PackageUtilTest {
         .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
         .thenReturn(pipe.sink());                               // second attempt succeeds
 
-    try {
-      PackageUtil.stageClasspathElements(
-          ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
-          MoreExecutors.newDirectExecutorService(), createOptions);
+    try (PackageUtil directPackageUtil =
+        PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+      directPackageUtil.stageClasspathElements(
+          ImmutableList.of(tmpFile.getAbsolutePath()),
+          STAGING_PATH,
+          fastNanoClockAndSleeper,
+          createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
       verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -418,8 +433,8 @@ public class PackageUtilTest {
         .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
             createStorageObject(STAGING_PATH, tmpFile.length()))));
 
-    PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH,
-        createOptions);
+    defaultPackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verifyNoMoreInteractions(mockGcsUtil);
@@ -438,7 +453,7 @@ public class PackageUtilTest {
             createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
-    PackageUtil.stageClasspathElements(
+    defaultPackageUtil.stageClasspathElements(
         ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -457,9 +472,11 @@ public class PackageUtilTest {
             new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
-        createOptions);
+    List<DataflowPackage> targets =
+        defaultPackageUtil.stageClasspathElements(
+            ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()),
+            STAGING_PATH,
+            createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -473,10 +490,14 @@ public class PackageUtilTest {
 
   @Test
   public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
-    String nonExistentFile = FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
-        .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE).toString();
-    assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
-        ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
+    String nonExistentFile =
+        FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
+            .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+    assertEquals(
+        Collections.EMPTY_LIST,
+        defaultPackageUtil.stageClasspathElements(
+            ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
   }
 
   /**