You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/06 00:46:27 UTC
[3/4] beam git commit: Make PackageUtil a proper class encapsulating
its ExecutorService
Make PackageUtil a proper class encapsulating its ExecutorService
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a211bd9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a211bd9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a211bd9b
Branch: refs/heads/master
Commit: a211bd9bb6365f1fe76e9b16355f721fcaa80b47
Parents: 31da49c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Sep 28 19:35:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 5 17:35:04 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/util/GcsStager.java | 8 +-
.../beam/runners/dataflow/util/PackageUtil.java | 59 ++++++++----
.../runners/dataflow/util/PackageUtilTest.java | 95 ++++++++++++--------
3 files changed, 103 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index d18e306..929be99 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -62,9 +62,9 @@ public class GcsStager implements Stager {
.setMimeType(MimeTypes.BINARY)
.build();
- return PackageUtil.stageClasspathElements(
- options.getFilesToStage(),
- options.getStagingLocation(),
- createOptions);
+ try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+ return packageUtil.stageClasspathElements(
+ options.getFilesToStage(), options.getStagingLocation(), createOptions);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 931f7ea..9d1e084 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -62,13 +64,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Helper routines for packages. */
-class PackageUtil {
+@Internal
+class PackageUtil implements Closeable {
+
private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
+
/**
* A reasonable upper bound on the number of jars required to launch a Dataflow job.
*/
private static final int SANE_CLASSPATH_SIZE = 1000;
+ private static final int DEFAULT_THREAD_POOL_SIZE = 32;
+
private static final FluentBackoff BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
@@ -77,6 +84,27 @@ class PackageUtil {
*/
private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
+ private final ListeningExecutorService executorService;
+
+ private PackageUtil(ListeningExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public static PackageUtil withDefaultThreadPool() {
+ return PackageUtil.withExecutorService(
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)));
+ }
+
+ public static PackageUtil withExecutorService(ListeningExecutorService executorService) {
+ return new PackageUtil(executorService);
+ }
+
+ @Override
+ public void close() {
+ executorService.shutdown();
+ }
+
+
/**
* Compute and cache the attributes of a classpath element that we will need to stage it.
*
@@ -140,9 +168,10 @@ class PackageUtil {
* Utility function that computes sizes and hashes of packages so that we can validate whether
* they have already been correctly staged.
*/
- private static List<PackageAttributes> computePackageAttributes(
- Collection<String> classpathElements, final String stagingPath,
- ListeningExecutorService executorService) {
+ private List<PackageAttributes> computePackageAttributes(
+ Collection<String> classpathElements,
+ final String stagingPath) {
+
List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
for (String classpathElement : classpathElements) {
@Nullable String userPackageName = null;
@@ -189,7 +218,7 @@ class PackageUtil {
* Utility to verify whether a package has already been staged and, if not, copy it to the
* staging location.
*/
- private static void stageOnePackage(
+ private void stageOnePackage(
PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
Sleeper retrySleeper, CreateOptions createOptions) {
String source = attributes.getSourcePath();
@@ -255,22 +284,16 @@ class PackageUtil {
* @param stagingPath The base location to stage the elements to.
* @return A list of cloud workflow packages, each representing a classpath element.
*/
- static List<DataflowPackage> stageClasspathElements(
+ List<DataflowPackage> stageClasspathElements(
Collection<String> classpathElements, String stagingPath, CreateOptions createOptions) {
- ListeningExecutorService executorService =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
- try {
- return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT,
- executorService, createOptions);
- } finally {
- executorService.shutdown();
- }
+ return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, createOptions);
}
// Visible for testing.
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, final String stagingPath,
- final Sleeper retrySleeper, ListeningExecutorService executorService,
+ List<DataflowPackage> stageClasspathElements(
+ Collection<String> classpathElements,
+ final String stagingPath,
+ final Sleeper retrySleeper,
final CreateOptions createOptions) {
LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+ "prepare for execution.", classpathElements.size());
@@ -290,7 +313,7 @@ class PackageUtil {
// Inline a copy here because the inner code returns an immutable list and we want to mutate it.
List<PackageAttributes> packageAttributes =
- new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
+ new LinkedList<>(computePackageAttributes(classpathElements, stagingPath));
// Compute the returned list of DataflowPackage objects here so that they are returned in the
// same order as on the classpath.
http://git-wip-us.apache.org/repos/asf/beam/blob/a211bd9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 5d0c0f2..de6416d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -86,6 +86,7 @@ import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.hamcrest.Matchers;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -97,22 +98,19 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-/** Tests for PackageUtil. */
+/** Tests for {@link PackageUtil}. */
@RunWith(JUnit4.class)
public class PackageUtilTest {
@Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
- @Rule
- public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
-
- @Mock
- GcsUtil mockGcsUtil;
+ @Mock GcsUtil mockGcsUtil;
// 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
private CreateOptions createOptions;
+ private PackageUtil defaultPackageUtil;
@Before
public void setUp() {
@@ -122,6 +120,12 @@ public class PackageUtilTest {
pipelineOptions.setGcsUtil(mockGcsUtil);
FileSystems.setDefaultPipelineOptions(pipelineOptions);
createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
+ defaultPackageUtil = PackageUtil.withDefaultThreadPool();
+ }
+
+ @After
+ public void teardown() {
+ defaultPackageUtil.close();
}
private File makeFileWithContents(String name, String contents) throws Exception {
@@ -224,7 +228,7 @@ public class PackageUtilTest {
classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
}
- PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions);
+ defaultPackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions);
logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
}
@@ -239,8 +243,9 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
- List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
+ List<DataflowPackage> targets =
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -269,9 +274,11 @@ public class PackageUtilTest {
}
});
- List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
- STAGING_PATH, createOptions);
+ List<DataflowPackage> targets =
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
+ STAGING_PATH,
+ createOptions);
// Verify that the packages are returned small, then large, matching input order even though
// the large file would be uploaded first.
assertThat(targets.get(0).getName(), startsWith("small"));
@@ -292,7 +299,7 @@ public class PackageUtilTest {
new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
- PackageUtil.stageClasspathElements(
+ defaultPackageUtil.stageClasspathElements(
ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -320,8 +327,9 @@ public class PackageUtilTest {
new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
- List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
+ List<DataflowPackage> targets =
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -342,10 +350,12 @@ public class PackageUtilTest {
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenThrow(new IOException("Fake Exception: Upload error"));
- try {
- PackageUtil.stageClasspathElements(
+ try (PackageUtil directPackageUtil =
+ PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+ directPackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+ STAGING_PATH,
+ fastNanoClockAndSleeper,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -365,10 +375,12 @@ public class PackageUtilTest {
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
- try {
- PackageUtil.stageClasspathElements(
+ try (PackageUtil directPackageUtil =
+ PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+ directPackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
- STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+ STAGING_PATH,
+ fastNanoClockAndSleeper,
createOptions);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
@@ -400,10 +412,13 @@ public class PackageUtilTest {
.thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
.thenReturn(pipe.sink()); // second attempt succeeds
- try {
- PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
- MoreExecutors.newDirectExecutorService(), createOptions);
+ try (PackageUtil directPackageUtil =
+ PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
+ directPackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH,
+ fastNanoClockAndSleeper,
+ createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -418,8 +433,8 @@ public class PackageUtilTest {
.thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
createStorageObject(STAGING_PATH, tmpFile.length()))));
- PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH,
- createOptions);
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verifyNoMoreInteractions(mockGcsUtil);
@@ -438,7 +453,7 @@ public class PackageUtilTest {
createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
- PackageUtil.stageClasspathElements(
+ defaultPackageUtil.stageClasspathElements(
ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -457,9 +472,11 @@ public class PackageUtilTest {
new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
- List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
- createOptions);
+ List<DataflowPackage> targets =
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()),
+ STAGING_PATH,
+ createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -473,10 +490,14 @@ public class PackageUtilTest {
@Test
public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
- String nonExistentFile = FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
- .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE).toString();
- assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
- ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
+ String nonExistentFile =
+ FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
+ .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE)
+ .toString();
+ assertEquals(
+ Collections.EMPTY_LIST,
+ defaultPackageUtil.stageClasspathElements(
+ ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
}
/**