You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/13 05:49:57 UTC

[1/2] incubator-beam git commit: [BEAM-430] Add GcpTempLocation, and remove defaulting tempLocation to stagingLocation in DataflowRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master a7689466d -> ee1a3bcfb


[BEAM-430] Add GcpTempLocation, and remove defaulting tempLocation to stagingLocation in DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3987cc1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3987cc1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3987cc1d

Branch: refs/heads/master
Commit: 3987cc1dac899e4465d4507ffaed623cb6ef300a
Parents: a768946
Author: Pei He <pe...@google.com>
Authored: Thu Jul 7 16:04:18 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 12 22:49:09 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 46 +++++----------
 .../options/DataflowPipelineOptions.java        | 46 +++++++++++++--
 .../dataflow/testing/TestDataflowRunner.java    | 12 ++--
 .../DataflowPipelineTranslatorTest.java         |  1 +
 .../runners/dataflow/DataflowRunnerTest.java    | 49 +++++++++++-----
 .../options/DataflowPipelineOptionsTest.java    | 62 ++++++++++++++++++++
 .../transforms/DataflowGroupByKeyTest.java      |  2 +-
 .../dataflow/transforms/DataflowViewTest.java   |  4 +-
 .../org/apache/beam/sdk/options/GcpOptions.java | 32 ++++++++++
 .../apache/beam/sdk/options/GcpOptionsTest.java | 23 ++++++++
 10 files changed, 220 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2ba6c7b..1f2fdca 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -181,16 +181,15 @@ import java.util.TreeSet;
 import javax.annotation.Nullable;
 
 /**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to the Dataflow representation
- * using the {@link DataflowPipelineTranslator} and then submitting
+ * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
+ * to the Dataflow representation using the {@link DataflowPipelineTranslator} and then submitting
  * them to a Dataflow service for execution.
  *
  * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code DataflowRunner}, the Google cloudservices account and the Google compute engine
- * service account of the GCP project running the Dataflow Job will need access to the corresponding
- * source/sink.
+ *
+ * When reading from a Dataflow source or writing to a Dataflow sink using {@code DataflowRunner},
+ * the Google cloudservices account and the Google compute engine service account of the GCP project
+ * running the Dataflow Job will need access to the corresponding source/sink.
  *
  * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
  * Dataflow Security and Permissions</a> for more details.
@@ -259,27 +258,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     PathValidator validator = dataflowOptions.getPathValidator();
-    checkArgument(!(isNullOrEmpty(dataflowOptions.getTempLocation())
-        && isNullOrEmpty(dataflowOptions.getStagingLocation())),
-        "Missing required value: at least one of tempLocation or stagingLocation must be set.");
-
-    if (dataflowOptions.getStagingLocation() != null) {
-      validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
-    }
-    if (dataflowOptions.getTempLocation() != null) {
-      validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
-    }
-    if (isNullOrEmpty(dataflowOptions.getTempLocation())) {
-      dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
-    } else if (isNullOrEmpty(dataflowOptions.getStagingLocation())) {
-      try {
-        dataflowOptions.setStagingLocation(
-            IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
-      } catch (IOException e) {
-        throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation "
-            + "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e);
-      }
-    }
+    checkArgument(
+        !isNullOrEmpty(dataflowOptions.getGcpTempLocation()),
+        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+    validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation());
+    checkArgument(
+        !isNullOrEmpty(dataflowOptions.getStagingLocation()),
+        "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions.");
+    validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
 
     if (dataflowOptions.getFilesToStage() == null) {
       dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
@@ -538,9 +524,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo());
     // The Dataflow Service may write to the temporary directory directly, so
     // must be verified.
-    if (!isNullOrEmpty(options.getTempLocation())) {
+    if (!isNullOrEmpty(options.getGcpTempLocation())) {
       newJob.getEnvironment().setTempStoragePrefix(
-          dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
+          dataflowOptions.getPathValidator().verifyPath(options.getGcpTempLocation()));
     }
     newJob.getEnvironment().setDataset(options.getTempDatasetId());
     newJob.getEnvironment().setExperiments(options.getExperiments());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index f665a08..b69b6f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -29,6 +32,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
 import com.google.common.base.MoreObjects;
 
@@ -37,6 +42,8 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import java.io.IOException;
+
 /**
  * Options that can be used to configure the {@link DataflowRunner}.
  */
@@ -61,14 +68,14 @@ public interface DataflowPipelineOptions
    *
    * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
    *
-   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
-   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
-   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+   * <p>If {@link #getStagingLocation()} is not set, it will default to
+   * {@link GcpOptions#getGcpTempLocation()}. {@link GcpOptions#getGcpTempLocation()}
+   * must be a valid GCS path.
    */
   @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
       + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
-      + "defaults to using tempLocation.")
+      + "If stagingLocation is unset, defaults to gcpTempLocation with \"/staging\" suffix.")
+  @Default.InstanceFactory(StagingLocationFactory.class)
   String getStagingLocation();
   void setStagingLocation(String value);
 
@@ -123,4 +130,33 @@ public interface DataflowPipelineOptions
       return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
     }
   }
+
+  /**
+   * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
+   */
+  public static class StagingLocationFactory implements DefaultValueFactory<String> {
+
+    @Override
+    public String create(PipelineOptions options) {
+      String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation();
+      checkArgument(!isNullOrEmpty(gcpTempLocation),
+          "Error constructing default value for stagingLocation: gcpTempLocation is missing."
+          + "Either stagingLocation must be set explicitly or a valid value must be provided"
+          + "for gcpTempLocation.");
+      try {
+        GcsPath.fromUri(gcpTempLocation);
+      } catch (Exception e) {
+        throw new IllegalArgumentException(String.format(
+            "Error constructing default value for stagingLocation: gcpTempLocation is not"
+            + " a valid GCS path, %s. ", gcpTempLocation));
+      }
+      try {
+        return IOChannelUtils.resolve(gcpTempLocation, "staging");
+      } catch (IOException e) {
+        throw new IllegalArgumentException(String.format(
+            "Unable to resolve stagingLocation from gcpTempLocation: %s."
+            + " Please set the staging location explicitly.", gcpTempLocation), e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 1325cf3..6894a10 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -74,12 +74,14 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   /**
    * Constructs a runner from the provided options.
    */
-  public static TestDataflowRunner fromOptions(
-      PipelineOptions options) {
+  public static TestDataflowRunner fromOptions(PipelineOptions options) {
     TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-    dataflowOptions.setStagingLocation(Joiner.on("/").join(
-        new String[]{dataflowOptions.getTempRoot(),
-          dataflowOptions.getJobName(), "output", "results"}));
+    String tempLocation = Joiner.on("/").join(
+        dataflowOptions.getTempRoot(),
+        dataflowOptions.getJobName(),
+        "output",
+        "results");
+    dataflowOptions.setTempLocation(tempLocation);
 
     return new TestDataflowRunner(dataflowOptions);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 48c757f..d4d571b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -198,6 +198,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
     settings.put("jobName", "some-job-name");
     settings.put("tempLocation", "gs://somebucket/some/path");
+    settings.put("gcpTempLocation", "gs://somebucket/some/path");
     settings.put("stagingLocation", "gs://somebucket/some/path/staging");
     settings.put("stableUniqueNames", "WARNING");
     settings.put("streaming", false);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 0cf1ade..f3cbb38 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -187,7 +187,14 @@ public class DataflowRunnerTest {
     return mockDataflowClient;
   }
 
-  private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException {
+  /**
+   * Build a mock {@link GcsUtil} with return values.
+   *
+   * @param bucketExist first return value
+   * @param bucketExists next return values
+   */
+  private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists)
+      throws IOException {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .then(new Answer<SeekableByteChannel>() {
@@ -206,7 +213,7 @@ public class DataflowRunnerTest {
         return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
       }
     });
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists);
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists);
     return mockGcsUtil;
   }
 
@@ -508,11 +515,11 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testInvalidTempLocation() throws IOException {
+  public void testInvalidGcpTempLocation() throws IOException {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setTempLocation("file://temp/location");
+    options.setGcpTempLocation("file://temp/location");
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
@@ -521,6 +528,19 @@ public class DataflowRunnerTest {
   }
 
   @Test
+  public void testNonGcsTempLocation() throws IOException {
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    options.setTempLocation("file://temp/location");
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+    DataflowRunner.fromOptions(options);
+  }
+
+  @Test
   public void testInvalidStagingLocation() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setStagingLocation("file://my/staging/location");
@@ -543,7 +563,8 @@ public class DataflowRunnerTest {
   public void testNonExistentTempLocation() throws IOException {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
-    GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+    GcsUtil mockGcsUtil =
+        buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
     options.setGcsUtil(mockGcsUtil);
     options.setTempLocation("gs://non-existent-bucket/location");
@@ -559,7 +580,8 @@ public class DataflowRunnerTest {
   public void testNonExistentStagingLocation() throws IOException {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
-    GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+    GcsUtil mockGcsUtil =
+        buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
     options.setGcsUtil(mockGcsUtil);
     options.setStagingLocation("gs://non-existent-bucket/location");
@@ -593,7 +615,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setGcpTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
     options.setGcpCredential(new TestCredential());
 
@@ -606,7 +628,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("google.com:some-project-12345");
 
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setGcpTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
     options.setGcpCredential(new TestCredential());
 
@@ -619,7 +641,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("12345");
 
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setGcpTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
 
     thrown.expect(IllegalArgumentException.class);
@@ -635,7 +657,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("some project");
 
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setGcpTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
 
     thrown.expect(IllegalArgumentException.class);
@@ -651,7 +673,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
 
     options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
@@ -671,8 +693,7 @@ public class DataflowRunnerTest {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "Missing required value: at least one of tempLocation or stagingLocation must be set.");
-
+        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
     DataflowRunner.fromOptions(options);
   }
 
@@ -682,7 +703,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setProject("foo-project");
-    options.setStagingLocation("gs://spam/ham/eggs");
+    options.setGcpTempLocation("gs://spam/ham/eggs");
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
 
     DataflowRunner.fromOptions(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index e7db40f..b5ee5e9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -17,15 +17,18 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -35,6 +38,7 @@ import org.junit.runners.JUnit4;
 public class DataflowPipelineOptionsTest {
   @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
   @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testJobNameIsSet() {
@@ -90,4 +94,62 @@ public class DataflowPipelineOptionsTest {
     options.setAppName("f\u0259\u02c8n\u025bt\u0131k \u0259so\u028asi\u02c8e\u0131\u0283n");
     assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
   }
+
+  @Test
+  public void testStagingLocation() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    IOChannelUtils.registerStandardIOFactories(options);
+    options.setTempLocation("file://temp_location");
+    options.setStagingLocation("gs://staging_location");
+    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+    assertEquals("gs://staging_location", options.getStagingLocation());
+  }
+
+  @Test
+  public void testDefaultToTempLocation() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    IOChannelUtils.registerStandardIOFactories(options);
+    options.setTempLocation("gs://temp_location");
+    assertEquals("gs://temp_location", options.getGcpTempLocation());
+    assertEquals("gs://temp_location/staging", options.getStagingLocation());
+  }
+
+  @Test
+  public void testDefaultToGcpTempLocation() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    IOChannelUtils.registerStandardIOFactories(options);
+    options.setTempLocation("gs://temp_location");
+    options.setGcpTempLocation("gs://gcp_temp_location");
+    assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());
+  }
+
+  @Test
+  public void testDefaultNoneGcsTempLocation() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setTempLocation("file://temp_location");
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+    options.getStagingLocation();
+  }
+
+  @Test
+  public void testDefaultInvalidGcpTempLocation() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setGcpTempLocation("file://temp_location");
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for stagingLocation: gcpTempLocation is not"
+        + " a valid GCS path");
+    options.getStagingLocation();
+  }
+
+  @Test
+  public void testDefaultStagingLocationUnset() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+    options.getStagingLocation();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index a44b8a7..b219ea2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -61,7 +61,7 @@ public class DataflowGroupByKeyTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
+    options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setDataflowClient(null);
     return Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index 1b263d2..95cbaae 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -54,7 +54,7 @@ public class DataflowViewTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
+    options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setDataflowClient(null);
     return Pipeline.create(options);
@@ -65,7 +65,7 @@ public class DataflowViewTest {
     options.setRunner(DataflowRunner.class);
     options.setStreaming(true);
     options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
+    options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setDataflowClient(null);
     return Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 4585266..1bf4dd6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -20,10 +20,12 @@ package org.apache.beam.sdk.options;
 import org.apache.beam.sdk.util.CredentialFactory;
 import org.apache.beam.sdk.util.GcpCredentialFactory;
 import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import com.google.common.io.Files;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -290,4 +292,34 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
   @Hidden
   String getAuthorizationServerEncodedUrl();
   void setAuthorizationServerEncodedUrl(String value);
+
+  /**
+   * A GCS path for storing temporary files in GCP.
+   *
+   * <p>Its default to {@link PipelineOptions#getTempLocation}.
+   */
+  @Description("A GCS path for storing temporary files in GCP.")
+  @Default.InstanceFactory(GcpTempLocationFactory.class)
+  String getGcpTempLocation();
+  void setGcpTempLocation(String value);
+
+  /**
+   * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+   */
+  public static class GcpTempLocationFactory implements DefaultValueFactory<String> {
+
+    @Override
+    public String create(PipelineOptions options) {
+      String tempLocation = options.getTempLocation();
+      if (!Strings.isNullOrEmpty(tempLocation)) {
+        try {
+          GcsPath.fromUri(tempLocation);
+        } catch (Exception e) {
+          // Ignore the temp location because it is not a valid 'gs://' path.
+          return null;
+        }
+      }
+      return tempLocation;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index c179738..c0f65d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.options;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -104,6 +106,27 @@ public class GcpOptionsTest {
     assertNull(projectFactory.create(PipelineOptionsFactory.create()));
   }
 
+  @Test
+  public void testEmptyGcpTempLocation() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+  }
+
+  @Test
+  public void testDefaultGcpTempLocation() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    String tempLocation = "gs://bucket";
+    options.setTempLocation(tempLocation);
+    assertEquals(tempLocation, options.getGcpTempLocation());
+  }
+
+  @Test
+  public void testDefaultGcpTempLocationInvalid() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setTempLocation("file://");
+    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+  }
+
   private static void makePropertiesFileWithProject(File path, String projectId)
       throws IOException {
     String properties = String.format("[core]%n"


[2/2] incubator-beam git commit: Closes #438

Posted by dh...@apache.org.
Closes #438


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1a3bcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1a3bcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1a3bcf

Branch: refs/heads/master
Commit: ee1a3bcfb7486c2fbe2bf48596ec8e0b06376f73
Parents: a768946 3987cc1
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 12 22:49:10 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 12 22:49:10 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 46 +++++----------
 .../options/DataflowPipelineOptions.java        | 46 +++++++++++++--
 .../dataflow/testing/TestDataflowRunner.java    | 12 ++--
 .../DataflowPipelineTranslatorTest.java         |  1 +
 .../runners/dataflow/DataflowRunnerTest.java    | 49 +++++++++++-----
 .../options/DataflowPipelineOptionsTest.java    | 62 ++++++++++++++++++++
 .../transforms/DataflowGroupByKeyTest.java      |  2 +-
 .../dataflow/transforms/DataflowViewTest.java   |  4 +-
 .../org/apache/beam/sdk/options/GcpOptions.java | 32 ++++++++++
 .../apache/beam/sdk/options/GcpOptionsTest.java | 23 ++++++++
 10 files changed, 220 insertions(+), 57 deletions(-)
----------------------------------------------------------------------