You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/13 05:49:57 UTC
[1/2] incubator-beam git commit: [BEAM-430] Add GcpTempLocation,
and remove defaulting tempLocation to stagingLocation in
DataflowRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master a7689466d -> ee1a3bcfb
[BEAM-430] Add GcpTempLocation, and remove defaulting tempLocation to stagingLocation in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3987cc1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3987cc1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3987cc1d
Branch: refs/heads/master
Commit: 3987cc1dac899e4465d4507ffaed623cb6ef300a
Parents: a768946
Author: Pei He <pe...@google.com>
Authored: Thu Jul 7 16:04:18 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 12 22:49:09 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 46 +++++----------
.../options/DataflowPipelineOptions.java | 46 +++++++++++++--
.../dataflow/testing/TestDataflowRunner.java | 12 ++--
.../DataflowPipelineTranslatorTest.java | 1 +
.../runners/dataflow/DataflowRunnerTest.java | 49 +++++++++++-----
.../options/DataflowPipelineOptionsTest.java | 62 ++++++++++++++++++++
.../transforms/DataflowGroupByKeyTest.java | 2 +-
.../dataflow/transforms/DataflowViewTest.java | 4 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 32 ++++++++++
.../apache/beam/sdk/options/GcpOptionsTest.java | 23 ++++++++
10 files changed, 220 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2ba6c7b..1f2fdca 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -181,16 +181,15 @@ import java.util.TreeSet;
import javax.annotation.Nullable;
/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to the Dataflow representation
- * using the {@link DataflowPipelineTranslator} and then submitting
+ * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
+ * to the Dataflow representation using the {@link DataflowPipelineTranslator} and then submitting
* them to a Dataflow service for execution.
*
* <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code DataflowRunner}, the Google cloudservices account and the Google compute engine
- * service account of the GCP project running the Dataflow Job will need access to the corresponding
- * source/sink.
+ *
+ * When reading from a Dataflow source or writing to a Dataflow sink using {@code DataflowRunner},
+ * the Google cloudservices account and the Google compute engine service account of the GCP project
+ * running the Dataflow Job will need access to the corresponding source/sink.
*
* <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
* Dataflow Security and Permissions</a> for more details.
@@ -259,27 +258,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
PathValidator validator = dataflowOptions.getPathValidator();
- checkArgument(!(isNullOrEmpty(dataflowOptions.getTempLocation())
- && isNullOrEmpty(dataflowOptions.getStagingLocation())),
- "Missing required value: at least one of tempLocation or stagingLocation must be set.");
-
- if (dataflowOptions.getStagingLocation() != null) {
- validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
- }
- if (dataflowOptions.getTempLocation() != null) {
- validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
- }
- if (isNullOrEmpty(dataflowOptions.getTempLocation())) {
- dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
- } else if (isNullOrEmpty(dataflowOptions.getStagingLocation())) {
- try {
- dataflowOptions.setStagingLocation(
- IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation "
- + "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e);
- }
- }
+ checkArgument(
+ !isNullOrEmpty(dataflowOptions.getGcpTempLocation()),
+ "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+ validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation());
+ checkArgument(
+ !isNullOrEmpty(dataflowOptions.getStagingLocation()),
+ "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions.");
+ validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
if (dataflowOptions.getFilesToStage() == null) {
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
@@ -538,9 +524,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
- if (!isNullOrEmpty(options.getTempLocation())) {
+ if (!isNullOrEmpty(options.getGcpTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
- dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
+ dataflowOptions.getPathValidator().verifyPath(options.getGcpTempLocation()));
}
newJob.getEnvironment().setDataset(options.getTempDatasetId());
newJob.getEnvironment().setExperiments(options.getExperiments());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index f665a08..b69b6f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.dataflow.options;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.BigQueryOptions;
@@ -29,6 +32,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
import com.google.common.base.MoreObjects;
@@ -37,6 +42,8 @@ import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import java.io.IOException;
+
/**
* Options that can be used to configure the {@link DataflowRunner}.
*/
@@ -61,14 +68,14 @@ public interface DataflowPipelineOptions
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
*
- * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
- * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
- * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+ * <p>If {@link #getStagingLocation()} is not set, it will default to
+ * {@link GcpOptions#getGcpTempLocation()}. {@link GcpOptions#getGcpTempLocation()}
+ * must be a valid GCS path.
*/
@Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
- + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
- + "defaults to using tempLocation.")
+ + "If stagingLocation is unset, defaults to gcpTempLocation with \"/staging\" suffix.")
+ @Default.InstanceFactory(StagingLocationFactory.class)
String getStagingLocation();
void setStagingLocation(String value);
@@ -123,4 +130,33 @@ public interface DataflowPipelineOptions
return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
}
}
+
+ /**
+ * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
+ */
+ public static class StagingLocationFactory implements DefaultValueFactory<String> {
+
+ @Override
+ public String create(PipelineOptions options) {
+ String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation();
+ checkArgument(!isNullOrEmpty(gcpTempLocation),
+ "Error constructing default value for stagingLocation: gcpTempLocation is missing."
+ + "Either stagingLocation must be set explicitly or a valid value must be provided"
+ + "for gcpTempLocation.");
+ try {
+ GcsPath.fromUri(gcpTempLocation);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Error constructing default value for stagingLocation: gcpTempLocation is not"
+ + " a valid GCS path, %s. ", gcpTempLocation));
+ }
+ try {
+ return IOChannelUtils.resolve(gcpTempLocation, "staging");
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format(
+ "Unable to resolve stagingLocation from gcpTempLocation: %s."
+ + " Please set the staging location explicitly.", gcpTempLocation), e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 1325cf3..6894a10 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -74,12 +74,14 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/**
* Constructs a runner from the provided options.
*/
- public static TestDataflowRunner fromOptions(
- PipelineOptions options) {
+ public static TestDataflowRunner fromOptions(PipelineOptions options) {
TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
- dataflowOptions.setStagingLocation(Joiner.on("/").join(
- new String[]{dataflowOptions.getTempRoot(),
- dataflowOptions.getJobName(), "output", "results"}));
+ String tempLocation = Joiner.on("/").join(
+ dataflowOptions.getTempRoot(),
+ dataflowOptions.getJobName(),
+ "output",
+ "results");
+ dataflowOptions.setTempLocation(tempLocation);
return new TestDataflowRunner(dataflowOptions);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 48c757f..d4d571b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -198,6 +198,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
settings.put("jobName", "some-job-name");
settings.put("tempLocation", "gs://somebucket/some/path");
+ settings.put("gcpTempLocation", "gs://somebucket/some/path");
settings.put("stagingLocation", "gs://somebucket/some/path/staging");
settings.put("stableUniqueNames", "WARNING");
settings.put("streaming", false);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 0cf1ade..f3cbb38 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -187,7 +187,14 @@ public class DataflowRunnerTest {
return mockDataflowClient;
}
- private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException {
+ /**
+ * Build a mock {@link GcsUtil} with return values.
+ *
+ * @param bucketExist first return value
+ * @param bucketExists next return values
+ */
+ private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists)
+ throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.then(new Answer<SeekableByteChannel>() {
@@ -206,7 +213,7 @@ public class DataflowRunnerTest {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists);
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists);
return mockGcsUtil;
}
@@ -508,11 +515,11 @@ public class DataflowRunnerTest {
}
@Test
- public void testInvalidTempLocation() throws IOException {
+ public void testInvalidGcpTempLocation() throws IOException {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
- options.setTempLocation("file://temp/location");
+ options.setGcpTempLocation("file://temp/location");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
@@ -521,6 +528,19 @@ public class DataflowRunnerTest {
}
@Test
+ public void testNonGcsTempLocation() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setTempLocation("file://temp/location");
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
public void testInvalidStagingLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStagingLocation("file://my/staging/location");
@@ -543,7 +563,8 @@ public class DataflowRunnerTest {
public void testNonExistentTempLocation() throws IOException {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
- GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+ GcsUtil mockGcsUtil =
+ buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
options.setGcsUtil(mockGcsUtil);
options.setTempLocation("gs://non-existent-bucket/location");
@@ -559,7 +580,8 @@ public class DataflowRunnerTest {
public void testNonExistentStagingLocation() throws IOException {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
- GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+ GcsUtil mockGcsUtil =
+ buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
options.setGcsUtil(mockGcsUtil);
options.setStagingLocation("gs://non-existent-bucket/location");
@@ -593,7 +615,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcpTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
options.setGcpCredential(new TestCredential());
@@ -606,7 +628,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("google.com:some-project-12345");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcpTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
options.setGcpCredential(new TestCredential());
@@ -619,7 +641,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("12345");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcpTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
thrown.expect(IllegalArgumentException.class);
@@ -635,7 +657,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("some project");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcpTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
thrown.expect(IllegalArgumentException.class);
@@ -651,7 +673,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
@@ -671,8 +693,7 @@ public class DataflowRunnerTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
- "Missing required value: at least one of tempLocation or stagingLocation must be set.");
-
+ "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
DataflowRunner.fromOptions(options);
}
@@ -682,7 +703,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setProject("foo-project");
- options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcpTempLocation("gs://spam/ham/eggs");
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
DataflowRunner.fromOptions(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index e7db40f..b5ee5e9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -17,15 +17,18 @@
*/
package org.apache.beam.runners.dataflow.options;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.IOChannelUtils;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,6 +38,7 @@ import org.junit.runners.JUnit4;
public class DataflowPipelineOptionsTest {
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testJobNameIsSet() {
@@ -90,4 +94,62 @@ public class DataflowPipelineOptionsTest {
options.setAppName("f\u0259\u02c8n\u025bt\u0131k \u0259so\u028asi\u02c8e\u0131\u0283n");
assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
}
+
+ @Test
+ public void testStagingLocation() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ IOChannelUtils.registerStandardIOFactories(options);
+ options.setTempLocation("file://temp_location");
+ options.setStagingLocation("gs://staging_location");
+ assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+ assertEquals("gs://staging_location", options.getStagingLocation());
+ }
+
+ @Test
+ public void testDefaultToTempLocation() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ IOChannelUtils.registerStandardIOFactories(options);
+ options.setTempLocation("gs://temp_location");
+ assertEquals("gs://temp_location", options.getGcpTempLocation());
+ assertEquals("gs://temp_location/staging", options.getStagingLocation());
+ }
+
+ @Test
+ public void testDefaultToGcpTempLocation() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ IOChannelUtils.registerStandardIOFactories(options);
+ options.setTempLocation("gs://temp_location");
+ options.setGcpTempLocation("gs://gcp_temp_location");
+ assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());
+ }
+
+ @Test
+ public void testDefaultNoneGcsTempLocation() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setTempLocation("file://temp_location");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+ options.getStagingLocation();
+ }
+
+ @Test
+ public void testDefaultInvalidGcpTempLocation() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setGcpTempLocation("file://temp_location");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for stagingLocation: gcpTempLocation is not"
+ + " a valid GCS path");
+ options.getStagingLocation();
+ }
+
+ @Test
+ public void testDefaultStagingLocationUnset() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+ options.getStagingLocation();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index a44b8a7..b219ea2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -61,7 +61,7 @@ public class DataflowGroupByKeyTest {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("someproject");
- options.setStagingLocation("gs://staging");
+ options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(null);
return Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index 1b263d2..95cbaae 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -54,7 +54,7 @@ public class DataflowViewTest {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("someproject");
- options.setStagingLocation("gs://staging");
+ options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(null);
return Pipeline.create(options);
@@ -65,7 +65,7 @@ public class DataflowViewTest {
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
options.setProject("someproject");
- options.setStagingLocation("gs://staging");
+ options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(null);
return Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 4585266..1bf4dd6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -20,10 +20,12 @@ package org.apache.beam.sdk.options;
import org.apache.beam.sdk.util.CredentialFactory;
import org.apache.beam.sdk.util.GcpCredentialFactory;
import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -290,4 +292,34 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
@Hidden
String getAuthorizationServerEncodedUrl();
void setAuthorizationServerEncodedUrl(String value);
+
+ /**
+ * A GCS path for storing temporary files in GCP.
+ *
+ * <p>Its default to {@link PipelineOptions#getTempLocation}.
+ */
+ @Description("A GCS path for storing temporary files in GCP.")
+ @Default.InstanceFactory(GcpTempLocationFactory.class)
+ String getGcpTempLocation();
+ void setGcpTempLocation(String value);
+
+ /**
+ * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+ */
+ public static class GcpTempLocationFactory implements DefaultValueFactory<String> {
+
+ @Override
+ public String create(PipelineOptions options) {
+ String tempLocation = options.getTempLocation();
+ if (!Strings.isNullOrEmpty(tempLocation)) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (Exception e) {
+ // Ignore the temp location because it is not a valid 'gs://' path.
+ return null;
+ }
+ }
+ return tempLocation;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index c179738..c0f65d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.options;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -104,6 +106,27 @@ public class GcpOptionsTest {
assertNull(projectFactory.create(PipelineOptionsFactory.create()));
}
+ @Test
+ public void testEmptyGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+ }
+
+ @Test
+ public void testDefaultGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://bucket";
+ options.setTempLocation(tempLocation);
+ assertEquals(tempLocation, options.getGcpTempLocation());
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationInvalid() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setTempLocation("file://");
+ assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+ }
+
private static void makePropertiesFileWithProject(File path, String projectId)
throws IOException {
String properties = String.format("[core]%n"
[2/2] incubator-beam git commit: Closes #438
Posted by dh...@apache.org.
Closes #438
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1a3bcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1a3bcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1a3bcf
Branch: refs/heads/master
Commit: ee1a3bcfb7486c2fbe2bf48596ec8e0b06376f73
Parents: a768946 3987cc1
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 12 22:49:10 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 12 22:49:10 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 46 +++++----------
.../options/DataflowPipelineOptions.java | 46 +++++++++++++--
.../dataflow/testing/TestDataflowRunner.java | 12 ++--
.../DataflowPipelineTranslatorTest.java | 1 +
.../runners/dataflow/DataflowRunnerTest.java | 49 +++++++++++-----
.../options/DataflowPipelineOptionsTest.java | 62 ++++++++++++++++++++
.../transforms/DataflowGroupByKeyTest.java | 2 +-
.../dataflow/transforms/DataflowViewTest.java | 4 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 32 ++++++++++
.../apache/beam/sdk/options/GcpOptionsTest.java | 23 ++++++++
10 files changed, 220 insertions(+), 57 deletions(-)
----------------------------------------------------------------------