You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/12/09 20:33:58 UTC
[1/2] incubator-beam git commit: Change Dataflow profiling option to
saveProfilesToGcs
Repository: incubator-beam
Updated Branches:
refs/heads/master 080dbaa3e -> 58bb1174d
Change Dataflow profiling option to saveProfilesToGcs
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e44cb12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e44cb12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e44cb12
Branch: refs/heads/master
Commit: 1e44cb12c2663b1353717bf9237618df74684102
Parents: 080dbaa
Author: bchambers <bc...@google.com>
Authored: Thu Dec 8 10:40:17 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Dec 9 12:31:30 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 5 +
.../options/DataflowProfilingOptions.java | 8 +-
.../DataflowPipelineTranslatorTest.java | 36 ++-
.../runners/dataflow/DataflowRunnerTest.java | 263 ++++++++++++-------
.../options/DataflowProfilingOptionsTest.java | 6 +-
5 files changed, 189 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 22f6f5a..85318e6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.base.Utf8;
import com.google.common.collect.ForwardingMap;
import com.google.common.collect.HashMultimap;
@@ -262,6 +263,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"DataflowRunner requires stagingLocation, and it is missing in PipelineOptions.");
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
+ if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
+ validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());
+ }
+
if (dataflowOptions.getFilesToStage() == null) {
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
DataflowRunner.class.getClassLoader()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
index 092c17a..a87d688 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
@@ -30,10 +30,10 @@ import org.apache.beam.sdk.options.Hidden;
@Hidden
public interface DataflowProfilingOptions {
- @Description("Whether to periodically dump profiling information to local disk.\n"
- + "WARNING: Enabling this option may fill local disk with profiling information.")
- boolean getEnableProfilingAgent();
- void setEnableProfilingAgent(boolean enabled);
+ @Description("When set to a non-empty value, enables recording profiles and saving them to GCS.\n"
+ + "Profiles will continue until the pipeline is stopped or updated without this option.\n")
+ String getSaveProfilesToGcs();
+ void setSaveProfilesToGcs(String gcsPath);
@Description(
"[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8d0b83a..ab82941 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.util.Structs.getDictionary;
import static org.apache.beam.sdk.util.Structs.getString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.Structs;
@@ -188,27 +189,22 @@ public class DataflowPipelineTranslatorTest implements Serializable {
p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
- // Note that the contents of this materialized map may be changed by the act of reading an
- // option, which will cause the default to get materialized whereas it would otherwise be
- // left absent. It is permissible to simply alter this test to reflect current behavior.
- Map<String, Object> settings = new HashMap<>();
- settings.put("appName", "DataflowPipelineTranslatorTest");
- settings.put("project", "some-project");
- settings.put("pathValidatorClass",
- "org.apache.beam.sdk.util.GcsPathValidator");
- settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
- settings.put("jobName", "some-job-name");
- settings.put("tempLocation", "gs://somebucket/some/path");
- settings.put("gcpTempLocation", "gs://somebucket/some/path");
- settings.put("stagingLocation", "gs://somebucket/some/path/staging");
- settings.put("stableUniqueNames", "WARNING");
- settings.put("streaming", false);
- settings.put("numberOfWorkerHarnessThreads", 0);
- settings.put("experiments", null);
-
Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
assertThat(sdkPipelineOptions, hasKey("options"));
- assertEquals(settings, sdkPipelineOptions.get("options"));
+ Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
+
+ assertThat(optionsMap, hasEntry("appName", (Object) "DataflowPipelineTranslatorTest"));
+ assertThat(optionsMap, hasEntry("project", (Object) "some-project"));
+ assertThat(optionsMap,
+ hasEntry("pathValidatorClass", (Object) GcsPathValidator.class.getName()));
+ assertThat(optionsMap, hasEntry("runner", (Object) DataflowRunner.class.getName()));
+ assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name"));
+ assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path"));
+ assertThat(optionsMap,
+ hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging"));
+ assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING"));
+ assertThat(optionsMap, hasEntry("streaming", (Object) false));
+ assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 1959be5..133ae8a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -110,6 +111,7 @@ import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Instant;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -128,6 +130,11 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class DataflowRunnerTest {
+ private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
+ private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
+ private static final String VALID_PROFILE_BUCKET = "gs://valid-bucket/profiles";
+ private static final String NON_EXISTENT_BUCKET = "gs://non-existent-bucket/location";
+
private static final String PROJECT_ID = "some-project";
@Rule
@@ -137,6 +144,9 @@ public class DataflowRunnerTest {
@Rule
public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
+ private Dataflow.Projects.Jobs mockJobs;
+ private GcsUtil mockGcsUtil;
+
// Asserts that the given Job has all expected fields set.
private static void assertValidJob(Job job) {
assertNull(job.getId());
@@ -144,6 +154,38 @@ public class DataflowRunnerTest {
assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
}
+ @Before
+ public void setUp() throws IOException {
+ this.mockGcsUtil = mock(GcsUtil.class);
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .then(new Answer<SeekableByteChannel>() {
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
+ when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))).
+ thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
+
+ // The dataflow pipeline attempts to output to this location.
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
+
+ mockJobs = mock(Dataflow.Projects.Jobs.class);
+ }
+
private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
@@ -155,19 +197,16 @@ public class DataflowRunnerTest {
return p;
}
- private static Dataflow buildMockDataflow(
- final ArgumentCaptor<Job> jobCaptor) throws IOException {
+ private Dataflow buildMockDataflow() throws IOException {
Dataflow mockDataflowClient = mock(Dataflow.class);
Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
- Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
Dataflow.Projects.Jobs.Create mockRequest =
mock(Dataflow.Projects.Jobs.Create.class);
Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class);
when(mockDataflowClient.projects()).thenReturn(mockProjects);
when(mockProjects.jobs()).thenReturn(mockJobs);
- when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture()))
- .thenReturn(mockRequest);
+ when(mockJobs.create(eq(PROJECT_ID), isA(Job.class))).thenReturn(mockRequest);
when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
when(mockList.setPageToken(anyString())).thenReturn(mockList);
when(mockList.execute())
@@ -186,25 +225,17 @@ public class DataflowRunnerTest {
return mockDataflowClient;
}
- /**
- * Build a mock {@link GcsUtil} with return values.
- *
- * @param bucketExist first return value
- * @param bucketAccessible next return values
- */
- private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible)
- throws IOException {
+ private GcsUtil buildMockGcsUtil() throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.then(new Answer<SeekableByteChannel>() {
- @Override
- public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
- return FileChannel.open(
- Files.createTempFile("channel-", ".tmp"),
- StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
- }
- });
-
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
@Override
@@ -212,26 +243,18 @@ public class DataflowRunnerTest {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class)))
- .thenReturn(bucketExist, bucketAccessible);
return mockGcsUtil;
}
private DataflowPipelineOptions buildPipelineOptions() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
- return buildPipelineOptions(jobCaptor);
- }
-
- private DataflowPipelineOptions buildPipelineOptions(
- ArgumentCaptor<Job> jobCaptor) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject(PROJECT_ID);
- options.setTempLocation("gs://somebucket/some/path");
+ options.setTempLocation(VALID_TEMP_BUCKET);
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
options.setFilesToStage(new LinkedList<String>());
- options.setDataflowClient(buildMockDataflow(jobCaptor));
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setDataflowClient(buildMockDataflow());
+ options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
return options;
}
@@ -271,22 +294,22 @@ public class DataflowRunnerTest {
@Test
public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
String mixedCase = "ThisJobNameHasMixedCase";
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
options.setJobName(mixedCase);
- DataflowRunner runner = DataflowRunner.fromOptions(options);
+ DataflowRunner.fromOptions(options);
assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
}
@Test
public void testRun() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@@ -317,14 +340,15 @@ public class DataflowRunnerTest {
@Test
public void testUpdate() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("oldJobName");
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@@ -378,9 +402,6 @@ public class DataflowRunnerTest {
public void testRunWithFiles() throws IOException {
// Test that the function DataflowRunner.stageFiles works as
// expected.
- GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */);
- final String gcsStaging = "gs://somebucket/some/path";
- final String gcsTemp = "gs://somebucket/some/temp/path";
final String cloudDataflowDataset = "somedataset";
// Create some temporary files.
@@ -391,17 +412,16 @@ public class DataflowRunnerTest {
String overridePackageName = "alias.txt";
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setFilesToStage(ImmutableList.of(
temp1.getAbsolutePath(),
overridePackageName + "=" + temp2.getAbsolutePath()));
- options.setStagingLocation(gcsStaging);
- options.setTempLocation(gcsTemp);
+ options.setStagingLocation(VALID_STAGING_BUCKET);
+ options.setTempLocation(VALID_TEMP_BUCKET);
options.setTempDatasetId(cloudDataflowDataset);
options.setProject(PROJECT_ID);
options.setJobName("job");
- options.setDataflowClient(buildMockDataflow(jobCaptor));
+ options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
@@ -410,6 +430,8 @@ public class DataflowRunnerTest {
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
Job workflowJob = jobCaptor.getValue();
assertValidJob(workflowJob);
@@ -424,7 +446,7 @@ public class DataflowRunnerTest {
assertEquals(overridePackageName, workflowPackage2.getName());
assertEquals(
- "storage.googleapis.com/somebucket/some/temp/path",
+ GcsPath.fromUri(VALID_TEMP_BUCKET).toResourceName(),
workflowJob.getEnvironment().getTempStoragePrefix());
assertEquals(
cloudDataflowDataset,
@@ -481,15 +503,12 @@ public class DataflowRunnerTest {
@Test
public void testGcsStagingLocationInitialization() throws Exception {
- // Test that the staging location is initialized correctly.
- String gcsTemp = "gs://somebucket/some/temp/path";
-
// Set temp location (required), and check that staging location is set.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setTempLocation(gcsTemp);
+ options.setTempLocation(VALID_TEMP_BUCKET);
options.setProject(PROJECT_ID);
options.setGcpCredential(new TestCredential());
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcsUtil(mockGcsUtil);
options.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(options);
@@ -499,9 +518,7 @@ public class DataflowRunnerTest {
@Test
public void testNonGcsFilePathInReadFailure() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions());
p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
thrown.expectCause(Matchers.allOf(
@@ -509,12 +526,16 @@ public class DataflowRunnerTest {
ThrowableMessageMatcher.hasMessage(
containsString("expected a valid 'gs://' path but was given"))));
p.run();
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonGcsFilePathInWriteFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+
PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
thrown.expect(IllegalArgumentException.class);
@@ -524,15 +545,16 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileReadPath() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions());
p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes"))));
p.run();
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@@ -548,22 +570,21 @@ public class DataflowRunnerTest {
@Test
public void testInvalidGcpTempLocation() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
options.setGcpTempLocation("file://temp/location");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
DataflowRunner.fromOptions(options);
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonGcsTempLocation() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
options.setTempLocation("file://temp/location");
thrown.expect(IllegalArgumentException.class);
@@ -592,39 +613,68 @@ public class DataflowRunnerTest {
}
@Test
- public void testNonExistentTempLocation() throws IOException {
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ public void testInvalidProfileLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setSaveProfilesToGcs("file://my/staging/location");
+ try {
+ DataflowRunner.fromOptions(options);
+ fail("fromOptions should have failed");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+ }
+ options.setSaveProfilesToGcs("my/staging/location");
+ try {
+ DataflowRunner.fromOptions(options);
+ fail("fromOptions should have failed");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+ }
+ }
- GcsUtil mockGcsUtil =
- buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
- options.setGcsUtil(mockGcsUtil);
- options.setGcpTempLocation("gs://non-existent-bucket/location");
+ @Test
+ public void testNonExistentTempLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setGcpTempLocation(NON_EXISTENT_BUCKET);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString(
- "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+ "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
@Test
public void testNonExistentStagingLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setStagingLocation(NON_EXISTENT_BUCKET);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString(
+ "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
+ DataflowRunner.fromOptions(options);
+
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+ assertValidJob(jobCaptor.getValue());
+ }
- GcsUtil mockGcsUtil =
- buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
- options.setGcpTempLocation(options.getTempLocation()); // bypass validation for GcpTempLocation
- options.setGcsUtil(mockGcsUtil);
- options.setStagingLocation("gs://non-existent-bucket/location");
+ @Test
+ public void testNonExistentProfileLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString(
- "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+ "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
- }
+ }
@Test
public void testNoProjectFails() {
@@ -648,8 +698,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
- options.setGcpTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
DataflowRunner.fromOptions(options);
@@ -661,8 +711,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("google.com:some-project-12345");
- options.setGcpTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
DataflowRunner.fromOptions(options);
@@ -674,8 +724,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("12345");
- options.setGcpTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Project ID");
@@ -690,8 +740,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("some project");
- options.setGcpTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Project ID");
@@ -706,8 +756,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
- options.setTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
@@ -731,25 +781,34 @@ public class DataflowRunnerTest {
}
@Test
- public void testStagingLocationAndNoTempLocationSucceeds() throws Exception {
+ public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setProject("foo-project");
- options.setGcpTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
DataflowRunner.fromOptions(options);
}
@Test
- public void testTempLocationAndNoStagingLocationSucceeds() throws Exception {
+ public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setProject("foo-project");
- options.setTempLocation("gs://spam/ham/eggs");
- options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setTempLocation(VALID_TEMP_BUCKET);
+ options.setGcsUtil(mockGcsUtil);
+
+ DataflowRunner.fromOptions(options);
+ }
+
+
+ @Test
+ public void testValidProfileLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setSaveProfilesToGcs(VALID_PROFILE_BUCKET);
DataflowRunner.fromOptions(options);
}
@@ -855,10 +914,7 @@ public class DataflowRunnerTest {
@Test
public void testTransformTranslatorMissing() throws IOException {
- // Test that we throw if we don't provide a translation.
- ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
- DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(Arrays.asList(1, 2, 3)))
@@ -869,6 +925,9 @@ public class DataflowRunnerTest {
DataflowPipelineTranslator.fromOptions(options)
.translate(
p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
index 87c74a4..299f3c8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.dataflow.options;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -38,9 +38,9 @@ public class DataflowProfilingOptionsTest {
@Test
public void testOptionsObject() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
- "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
+ "--saveProfilesToGcs=path", "--profilingAgentConfiguration={\"interval\": 21}"})
.as(DataflowPipelineOptions.class);
- assertTrue(options.getEnableProfilingAgent());
+ assertThat(options.getSaveProfilesToGcs(), equalTo("path"));
String json = MAPPER.writeValueAsString(options);
assertThat(json, Matchers.containsString(
[2/2] incubator-beam git commit: This closes #1554
Posted by da...@apache.org.
This closes #1554
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58bb1174
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58bb1174
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58bb1174
Branch: refs/heads/master
Commit: 58bb1174d087c5d3976dab921c528fdb6a45dbab
Parents: 080dbaa 1e44cb1
Author: Davor Bonaci <da...@google.com>
Authored: Fri Dec 9 12:33:50 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Dec 9 12:33:50 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 5 +
.../options/DataflowProfilingOptions.java | 8 +-
.../DataflowPipelineTranslatorTest.java | 36 ++-
.../runners/dataflow/DataflowRunnerTest.java | 263 ++++++++++++-------
.../options/DataflowProfilingOptionsTest.java | 6 +-
5 files changed, 189 insertions(+), 129 deletions(-)
----------------------------------------------------------------------