You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/12/09 20:33:58 UTC

[1/2] incubator-beam git commit: Change Dataflow profiling option to saveProfilesToGcs

Repository: incubator-beam
Updated Branches:
  refs/heads/master 080dbaa3e -> 58bb1174d


Change Dataflow profiling option to saveProfilesToGcs


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e44cb12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e44cb12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e44cb12

Branch: refs/heads/master
Commit: 1e44cb12c2663b1353717bf9237618df74684102
Parents: 080dbaa
Author: bchambers <bc...@google.com>
Authored: Thu Dec 8 10:40:17 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Dec 9 12:31:30 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +
 .../options/DataflowProfilingOptions.java       |   8 +-
 .../DataflowPipelineTranslatorTest.java         |  36 ++-
 .../runners/dataflow/DataflowRunnerTest.java    | 263 ++++++++++++-------
 .../options/DataflowProfilingOptionsTest.java   |   6 +-
 5 files changed, 189 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 22f6f5a..85318e6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.base.Utf8;
 import com.google.common.collect.ForwardingMap;
 import com.google.common.collect.HashMultimap;
@@ -262,6 +263,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions.");
     validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
 
+    if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
+      validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());
+    }
+
     if (dataflowOptions.getFilesToStage() == null) {
       dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
           DataflowRunner.class.getClassLoader()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
index 092c17a..a87d688 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
@@ -30,10 +30,10 @@ import org.apache.beam.sdk.options.Hidden;
 @Hidden
 public interface DataflowProfilingOptions {
 
-  @Description("Whether to periodically dump profiling information to local disk.\n"
-      + "WARNING: Enabling this option may fill local disk with profiling information.")
-  boolean getEnableProfilingAgent();
-  void setEnableProfilingAgent(boolean enabled);
+  @Description("When set to a non-empty value, enables recording profiles and saving them to GCS.\n"
+      + "Profiles will continue until the pipeline is stopped or updated without this option.\n")
+  String getSaveProfilesToGcs();
+  void setSaveProfilesToGcs(String gcsPath);
 
   @Description(
       "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8d0b83a..ab82941 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.util.Structs.getDictionary;
 import static org.apache.beam.sdk.util.Structs.getString;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.Structs;
@@ -188,27 +189,22 @@ public class DataflowPipelineTranslatorTest implements Serializable {
                 p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
-    // Note that the contents of this materialized map may be changed by the act of reading an
-    // option, which will cause the default to get materialized whereas it would otherwise be
-    // left absent. It is permissible to simply alter this test to reflect current behavior.
-    Map<String, Object> settings = new HashMap<>();
-    settings.put("appName", "DataflowPipelineTranslatorTest");
-    settings.put("project", "some-project");
-    settings.put("pathValidatorClass",
-        "org.apache.beam.sdk.util.GcsPathValidator");
-    settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
-    settings.put("jobName", "some-job-name");
-    settings.put("tempLocation", "gs://somebucket/some/path");
-    settings.put("gcpTempLocation", "gs://somebucket/some/path");
-    settings.put("stagingLocation", "gs://somebucket/some/path/staging");
-    settings.put("stableUniqueNames", "WARNING");
-    settings.put("streaming", false);
-    settings.put("numberOfWorkerHarnessThreads", 0);
-    settings.put("experiments", null);
-
     Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
     assertThat(sdkPipelineOptions, hasKey("options"));
-    assertEquals(settings, sdkPipelineOptions.get("options"));
+    Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
+
+    assertThat(optionsMap, hasEntry("appName", (Object) "DataflowPipelineTranslatorTest"));
+    assertThat(optionsMap, hasEntry("project", (Object) "some-project"));
+    assertThat(optionsMap,
+        hasEntry("pathValidatorClass", (Object) GcsPathValidator.class.getName()));
+    assertThat(optionsMap, hasEntry("runner", (Object) DataflowRunner.class.getName()));
+    assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name"));
+    assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path"));
+    assertThat(optionsMap,
+        hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging"));
+    assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING"));
+    assertThat(optionsMap, hasEntry("streaming", (Object) false));
+    assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 1959be5..133ae8a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -110,6 +111,7 @@ import org.hamcrest.Description;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -128,6 +130,11 @@ import org.mockito.stubbing.Answer;
 @RunWith(JUnit4.class)
 public class DataflowRunnerTest {
 
+  private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
+  private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
+  private static final String VALID_PROFILE_BUCKET = "gs://valid-bucket/profiles";
+  private static final String NON_EXISTENT_BUCKET = "gs://non-existent-bucket/location";
+
   private static final String PROJECT_ID = "some-project";
 
   @Rule
@@ -137,6 +144,9 @@ public class DataflowRunnerTest {
   @Rule
   public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
 
+  private Dataflow.Projects.Jobs mockJobs;
+  private GcsUtil mockGcsUtil;
+
   // Asserts that the given Job has all expected fields set.
   private static void assertValidJob(Job job) {
     assertNull(job.getId());
@@ -144,6 +154,38 @@ public class DataflowRunnerTest {
     assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
   }
 
+  @Before
+  public void setUp() throws IOException {
+    this.mockGcsUtil = mock(GcsUtil.class);
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
+    when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+      @Override
+      public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+      }
+    });
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))).
+        thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
+
+    // The dataflow pipeline attempts to output to this location.
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
+
+    mockJobs = mock(Dataflow.Projects.Jobs.class);
+  }
+
   private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
     options.setStableUniqueNames(CheckEnabled.ERROR);
     options.setRunner(DataflowRunner.class);
@@ -155,19 +197,16 @@ public class DataflowRunnerTest {
     return p;
   }
 
-  private static Dataflow buildMockDataflow(
-      final ArgumentCaptor<Job> jobCaptor) throws IOException {
+  private Dataflow buildMockDataflow() throws IOException {
     Dataflow mockDataflowClient = mock(Dataflow.class);
     Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
-    Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
     Dataflow.Projects.Jobs.Create mockRequest =
         mock(Dataflow.Projects.Jobs.Create.class);
     Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class);
 
     when(mockDataflowClient.projects()).thenReturn(mockProjects);
     when(mockProjects.jobs()).thenReturn(mockJobs);
-    when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture()))
-        .thenReturn(mockRequest);
+    when(mockJobs.create(eq(PROJECT_ID), isA(Job.class))).thenReturn(mockRequest);
     when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
     when(mockList.setPageToken(anyString())).thenReturn(mockList);
     when(mockList.execute())
@@ -186,25 +225,17 @@ public class DataflowRunnerTest {
     return mockDataflowClient;
   }
 
-  /**
-   * Build a mock {@link GcsUtil} with return values.
-   *
-   * @param bucketExist first return value
-   * @param bucketAccessible next return values
-   */
-  private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible)
-      throws IOException {
+  private GcsUtil buildMockGcsUtil() throws IOException {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .then(new Answer<SeekableByteChannel>() {
-              @Override
-              public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
-                return FileChannel.open(
-                    Files.createTempFile("channel-", ".tmp"),
-                    StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-              }
-            });
-
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
     when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
       @Override
@@ -212,26 +243,18 @@ public class DataflowRunnerTest {
         return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
       }
     });
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class)))
-        .thenReturn(bucketExist, bucketAccessible);
     return mockGcsUtil;
   }
 
   private DataflowPipelineOptions buildPipelineOptions() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    return buildPipelineOptions(jobCaptor);
-  }
-
-  private DataflowPipelineOptions buildPipelineOptions(
-      ArgumentCaptor<Job> jobCaptor) throws IOException {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject(PROJECT_ID);
-    options.setTempLocation("gs://somebucket/some/path");
+    options.setTempLocation(VALID_TEMP_BUCKET);
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
     options.setFilesToStage(new LinkedList<String>());
-    options.setDataflowClient(buildMockDataflow(jobCaptor));
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setDataflowClient(buildMockDataflow());
+    options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
     return options;
   }
@@ -271,22 +294,22 @@ public class DataflowRunnerTest {
   @Test
   public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
     String mixedCase = "ThisJobNameHasMixedCase";
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     options.setJobName(mixedCase);
 
-    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    DataflowRunner.fromOptions(options);
     assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
   }
 
   @Test
   public void testRun() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     Pipeline p = buildDataflowPipeline(options);
     DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -317,14 +340,15 @@ public class DataflowRunnerTest {
 
   @Test
   public void testUpdate() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     options.setUpdate(true);
     options.setJobName("oldJobName");
     Pipeline p = buildDataflowPipeline(options);
     DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -378,9 +402,6 @@ public class DataflowRunnerTest {
   public void testRunWithFiles() throws IOException {
     // Test that the function DataflowRunner.stageFiles works as
     // expected.
-    GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */);
-    final String gcsStaging = "gs://somebucket/some/path";
-    final String gcsTemp = "gs://somebucket/some/temp/path";
     final String cloudDataflowDataset = "somedataset";
 
     // Create some temporary files.
@@ -391,17 +412,16 @@ public class DataflowRunnerTest {
 
     String overridePackageName = "alias.txt";
 
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setFilesToStage(ImmutableList.of(
         temp1.getAbsolutePath(),
         overridePackageName + "=" + temp2.getAbsolutePath()));
-    options.setStagingLocation(gcsStaging);
-    options.setTempLocation(gcsTemp);
+    options.setStagingLocation(VALID_STAGING_BUCKET);
+    options.setTempLocation(VALID_TEMP_BUCKET);
     options.setTempDatasetId(cloudDataflowDataset);
     options.setProject(PROJECT_ID);
     options.setJobName("job");
-    options.setDataflowClient(buildMockDataflow(jobCaptor));
+    options.setDataflowClient(buildMockDataflow());
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
@@ -410,6 +430,8 @@ public class DataflowRunnerTest {
     DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
 
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     Job workflowJob = jobCaptor.getValue();
     assertValidJob(workflowJob);
 
@@ -424,7 +446,7 @@ public class DataflowRunnerTest {
     assertEquals(overridePackageName, workflowPackage2.getName());
 
     assertEquals(
-        "storage.googleapis.com/somebucket/some/temp/path",
+        GcsPath.fromUri(VALID_TEMP_BUCKET).toResourceName(),
         workflowJob.getEnvironment().getTempStoragePrefix());
     assertEquals(
         cloudDataflowDataset,
@@ -481,15 +503,12 @@ public class DataflowRunnerTest {
 
   @Test
   public void testGcsStagingLocationInitialization() throws Exception {
-    // Test that the staging location is initialized correctly.
-    String gcsTemp = "gs://somebucket/some/temp/path";
-
     // Set temp location (required), and check that staging location is set.
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setTempLocation(gcsTemp);
+    options.setTempLocation(VALID_TEMP_BUCKET);
     options.setProject(PROJECT_ID);
     options.setGcpCredential(new TestCredential());
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcsUtil(mockGcsUtil);
     options.setRunner(DataflowRunner.class);
 
     DataflowRunner.fromOptions(options);
@@ -499,9 +518,7 @@ public class DataflowRunnerTest {
 
   @Test
   public void testNonGcsFilePathInReadFailure() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
     p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
 
     thrown.expectCause(Matchers.allOf(
@@ -509,12 +526,16 @@ public class DataflowRunnerTest {
         ThrowableMessageMatcher.hasMessage(
             containsString("expected a valid 'gs://' path but was given"))));
     p.run();
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
   @Test
   public void testNonGcsFilePathInWriteFailure() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+
     PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
@@ -524,15 +545,16 @@ public class DataflowRunnerTest {
 
   @Test
   public void testMultiSlashGcsFileReadPath() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
     p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
         ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes"))));
     p.run();
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -548,22 +570,21 @@ public class DataflowRunnerTest {
 
   @Test
   public void testInvalidGcpTempLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     options.setGcpTempLocation("file://temp/location");
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
     DataflowRunner.fromOptions(options);
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
   @Test
   public void testNonGcsTempLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     options.setTempLocation("file://temp/location");
 
     thrown.expect(IllegalArgumentException.class);
@@ -592,39 +613,68 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testNonExistentTempLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+  public void testInvalidProfileLocation() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSaveProfilesToGcs("file://my/staging/location");
+    try {
+      DataflowRunner.fromOptions(options);
+      fail("fromOptions should have failed");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+    }
+    options.setSaveProfilesToGcs("my/staging/location");
+    try {
+      DataflowRunner.fromOptions(options);
+      fail("fromOptions should have failed");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+    }
+  }
 
-    GcsUtil mockGcsUtil =
-        buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setGcsUtil(mockGcsUtil);
-    options.setGcpTempLocation("gs://non-existent-bucket/location");
+  @Test
+  public void testNonExistentTempLocation() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setGcpTempLocation(NON_EXISTENT_BUCKET);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString(
-        "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+        "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
     DataflowRunner.fromOptions(options);
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
   @Test
   public void testNonExistentStagingLocation() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setStagingLocation(NON_EXISTENT_BUCKET);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString(
+        "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
+    DataflowRunner.fromOptions(options);
+
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    assertValidJob(jobCaptor.getValue());
+  }
 
-    GcsUtil mockGcsUtil =
-        buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setGcpTempLocation(options.getTempLocation()); // bypass validation for GcpTempLocation
-    options.setGcsUtil(mockGcsUtil);
-    options.setStagingLocation("gs://non-existent-bucket/location");
+  @Test
+  public void testNonExistentProfileLocation() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString(
-        "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+        "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
     DataflowRunner.fromOptions(options);
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
-  }
+   }
 
   @Test
   public void testNoProjectFails() {
@@ -648,8 +698,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 
-    options.setGcpTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
     DataflowRunner.fromOptions(options);
@@ -661,8 +711,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("google.com:some-project-12345");
 
-    options.setGcpTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
     DataflowRunner.fromOptions(options);
@@ -674,8 +724,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("12345");
 
-    options.setGcpTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Project ID");
@@ -690,8 +740,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("some project");
 
-    options.setGcpTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Project ID");
@@ -706,8 +756,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 
-    options.setTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
 
     options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
 
@@ -731,25 +781,34 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testStagingLocationAndNoTempLocationSucceeds() throws Exception {
+  public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setProject("foo-project");
-    options.setGcpTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setGcpTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
 
     DataflowRunner.fromOptions(options);
   }
 
   @Test
-  public void testTempLocationAndNoStagingLocationSucceeds() throws Exception {
+  public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setProject("foo-project");
-    options.setTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setTempLocation(VALID_TEMP_BUCKET);
+    options.setGcsUtil(mockGcsUtil);
+
+    DataflowRunner.fromOptions(options);
+  }
+
+
+  @Test
+  public void testValidProfileLocation() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSaveProfilesToGcs(VALID_PROFILE_BUCKET);
 
     DataflowRunner.fromOptions(options);
   }
@@ -855,10 +914,7 @@ public class DataflowRunnerTest {
 
   @Test
   public void testTransformTranslatorMissing() throws IOException {
-    // Test that we throw if we don't provide a translation.
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    DataflowPipelineOptions options = buildPipelineOptions();
     Pipeline p = Pipeline.create(options);
 
     p.apply(Create.of(Arrays.asList(1, 2, 3)))
@@ -869,6 +925,9 @@ public class DataflowRunnerTest {
     DataflowPipelineTranslator.fromOptions(options)
         .translate(
             p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
index 87c74a4..299f3c8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -38,9 +38,9 @@ public class DataflowProfilingOptionsTest {
   @Test
   public void testOptionsObject() throws Exception {
     DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
-        "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
+        "--saveProfilesToGcs=path", "--profilingAgentConfiguration={\"interval\": 21}"})
         .as(DataflowPipelineOptions.class);
-    assertTrue(options.getEnableProfilingAgent());
+    assertThat(options.getSaveProfilesToGcs(), equalTo("path"));
 
     String json = MAPPER.writeValueAsString(options);
     assertThat(json, Matchers.containsString(


[2/2] incubator-beam git commit: This closes #1554

Posted by da...@apache.org.
This closes #1554


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58bb1174
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58bb1174
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58bb1174

Branch: refs/heads/master
Commit: 58bb1174d087c5d3976dab921c528fdb6a45dbab
Parents: 080dbaa 1e44cb1
Author: Davor Bonaci <da...@google.com>
Authored: Fri Dec 9 12:33:50 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Dec 9 12:33:50 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +
 .../options/DataflowProfilingOptions.java       |   8 +-
 .../DataflowPipelineTranslatorTest.java         |  36 ++-
 .../runners/dataflow/DataflowRunnerTest.java    | 263 ++++++++++++-------
 .../options/DataflowProfilingOptionsTest.java   |   6 +-
 5 files changed, 189 insertions(+), 129 deletions(-)
----------------------------------------------------------------------