You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:11 UTC
[24/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
new file mode 100644
index 0000000..dd1b3c8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
+import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary;
+import static com.google.cloud.dataflow.sdk.util.Structs.getString;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.Step;
+import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.OutputReference;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.util.Structs;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for DataflowPipelineTranslator.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPipelineTranslatorTest implements Serializable {
+
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ // A Custom Mockito matcher for an initial Job that checks that all
+ // expected fields are set.
+ private static class IsValidCreateRequest extends ArgumentMatcher<Job> {
+ @Override
+ public boolean matches(Object o) {
+ Job job = (Job) o;
+ return job.getId() == null
+ && job.getProjectId() == null
+ && job.getName() != null
+ && job.getType() != null
+ && job.getEnvironment() != null
+ && job.getSteps() != null
+ && job.getCurrentState() == null
+ && job.getCurrentStateTime() == null
+ && job.getExecutionInfo() == null
+ && job.getCreateTime() == null;
+ }
+ }
+
+ private DataflowPipeline buildPipeline(DataflowPipelineOptions options) {
+ DataflowPipeline p = DataflowPipeline.create(options);
+
+ p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
+ .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+
+ return p;
+ }
+
+ private static Dataflow buildMockDataflow(
+ ArgumentMatcher<Job> jobMatcher) throws IOException {
+ Dataflow mockDataflowClient = mock(Dataflow.class);
+ Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
+ Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
+ Dataflow.Projects.Jobs.Create mockRequest = mock(
+ Dataflow.Projects.Jobs.Create.class);
+
+ when(mockDataflowClient.projects()).thenReturn(mockProjects);
+ when(mockProjects.jobs()).thenReturn(mockJobs);
+ when(mockJobs.create(eq("someProject"), argThat(jobMatcher)))
+ .thenReturn(mockRequest);
+
+ Job resultJob = new Job();
+ resultJob.setId("newid");
+ when(mockRequest.execute()).thenReturn(resultJob);
+ return mockDataflowClient;
+ }
+
+ private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
+ GcsUtil mockGcsUtil = mock(GcsUtil.class);
+ when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setJobName("some-job-name");
+ options.setProject("some-project");
+ options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
+ options.setFilesToStage(new LinkedList<String>());
+ options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
+ options.setGcsUtil(mockGcsUtil);
+ return options;
+ }
+
+ @Test
+ public void testSettingOfSdkPipelineOptions() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowPipelineRunner.class);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ // Note that the contents of this materialized map may be changed by the act of reading an
+ // option, which will cause the default to get materialized whereas it would otherwise be
+ // left absent. It is permissible to simply alter this test to reflect current behavior.
+ Map<String, Object> settings = new HashMap<>();
+ settings.put("appName", "DataflowPipelineTranslatorTest");
+ settings.put("project", "some-project");
+ settings.put("pathValidatorClass", "com.google.cloud.dataflow.sdk.util.DataflowPathValidator");
+ settings.put("runner", "com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner");
+ settings.put("jobName", "some-job-name");
+ settings.put("tempLocation", "gs://somebucket/some/path");
+ settings.put("stagingLocation", "gs://somebucket/some/path/staging");
+ settings.put("stableUniqueNames", "WARNING");
+ settings.put("streaming", false);
+ settings.put("numberOfWorkerHarnessThreads", 0);
+ settings.put("experiments", null);
+
+ assertEquals(ImmutableMap.of("options", settings),
+ job.getEnvironment().getSdkPipelineOptions());
+ }
+
+ @Test
+ public void testNetworkConfig() throws IOException {
+ final String testNetwork = "test-network";
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setNetwork(testNetwork);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(testNetwork,
+ job.getEnvironment().getWorkerPools().get(0).getNetwork());
+ }
+
+ @Test
+ public void testNetworkConfigMissing() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
+ }
+
+ @Test
+ public void testSubnetworkConfig() throws IOException {
+ final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setSubnetwork(testSubnetwork);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(testSubnetwork,
+ job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+ }
+
+ @Test
+ public void testSubnetworkConfigMissing() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+ }
+
+ @Test
+ public void testScalingAlgorithmMissing() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ // Autoscaling settings are always set.
+ assertNull(
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getAlgorithm());
+ assertEquals(
+ 0,
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getMaxNumWorkers()
+ .intValue());
+ }
+
+ @Test
+ public void testScalingAlgorithmNone() throws IOException {
+ final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling =
+ DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE;
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setAutoscalingAlgorithm(noScaling);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(
+ "AUTOSCALING_ALGORITHM_NONE",
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getAlgorithm());
+ assertEquals(
+ 0,
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getMaxNumWorkers()
+ .intValue());
+ }
+
+ @Test
+ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
+ final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null;
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setMaxNumWorkers(42);
+ options.setAutoscalingAlgorithm(noScaling);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertNull(
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getAlgorithm());
+ assertEquals(
+ 42,
+ job
+ .getEnvironment()
+ .getWorkerPools()
+ .get(0)
+ .getAutoscalingSettings()
+ .getMaxNumWorkers()
+ .intValue());
+ }
+
+ @Test
+ public void testZoneConfig() throws IOException {
+ final String testZone = "test-zone-1";
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setZone(testZone);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(testZone,
+ job.getEnvironment().getWorkerPools().get(0).getZone());
+ }
+
+ @Test
+ public void testWorkerMachineTypeConfig() throws IOException {
+ final String testMachineType = "test-machine-type";
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setWorkerMachineType(testMachineType);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+
+ WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0);
+ assertEquals(testMachineType, workerPool.getMachineType());
+ }
+
+ @Test
+ public void testDiskSizeGbConfig() throws IOException {
+ final Integer diskSizeGb = 1234;
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setDiskSizeGb(diskSizeGb);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(diskSizeGb,
+ job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
+ }
+
+ @Test
+ public void testPredefinedAddStep() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+ DataflowPipelineTranslator.registerTransformTranslator(
+ EmbeddedTransform.class, new EmbeddedTranslator());
+
+ // Create a predefined step using another pipeline
+ Step predefinedStep = createPredefinedStep();
+
+ // Create a pipeline that the predefined step will be embedded into
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+ .apply(ParDo.of(new NoOpFn()))
+ .apply(new EmbeddedTransform(predefinedStep.clone()))
+ .apply(ParDo.of(new NoOpFn()));
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(4, steps.size());
+
+ // The input to the embedded step should match the output of the step before
+ Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
+ Map<String, Object> step2In = getDictionary(
+ steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
+ assertEquals(step1Out, step2In);
+
+ // The output from the embedded step should match the input of the step after
+ Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
+ Map<String, Object> step3In = getDictionary(
+ steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
+ assertEquals(step2Out, step3In);
+
+ // The step should not have been modified other than remapping the input
+ Step predefinedStepClone = predefinedStep.clone();
+ Step embeddedStepClone = steps.get(2).clone();
+ predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+ embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+ assertEquals(predefinedStepClone, embeddedStepClone);
+ }
+
+ /**
+ * Construct a OutputReference for the output of the step.
+ */
+ private static OutputReference getOutputPortReference(Step step) throws Exception {
+ // TODO: This should be done via a Structs accessor.
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> output =
+ (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
+ String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME);
+ return new OutputReference(step.getName(), outputTagId);
+ }
+
+ /**
+ * Returns a Step for a DoFn by creating and translating a pipeline.
+ */
+ private static Step createPredefinedStep() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ String stepName = "DoFn1";
+ pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+ .apply(ParDo.of(new NoOpFn()).named(stepName))
+ .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ assertEquals(13, job.getSteps().size());
+ Step step = job.getSteps().get(1);
+ assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
+ return step;
+ }
+
+ private static class NoOpFn extends DoFn<String, String> {
+ @Override public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ }
+
+ /**
+ * A placeholder transform that will be used to substitute a predefined Step.
+ */
+ private static class EmbeddedTransform
+ extends PTransform<PCollection<String>, PCollection<String>> {
+ private final Step step;
+
+ public EmbeddedTransform(Step step) {
+ this.step = step;
+ }
+
+ @Override
+ public PCollection<String> apply(PCollection<String> input) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded());
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+ }
+
+ /**
+ * A TransformTranslator that adds the predefined Step using
+ * {@link TranslationContext#addStep} and remaps the input port reference.
+ */
+ private static class EmbeddedTranslator
+ implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+ @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
+ addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
+ context.asOutputReference(context.getInput(transform)));
+ context.addStep(transform, transform.step);
+ }
+ }
+
+ /**
+ * A composite transform that returns an output that is unrelated to
+ * the input.
+ */
+ private static class UnrelatedOutputCreator
+ extends PTransform<PCollection<Integer>, PCollection<Integer>> {
+
+ @Override
+ public PCollection<Integer> apply(PCollection<Integer> input) {
+ // Apply an operation so that this is a composite transform.
+ input.apply(Count.<Integer>perElement());
+
+ // Return a value unrelated to the input.
+ return input.getPipeline().apply(Create.of(1, 2, 3, 4));
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder() {
+ return VarIntCoder.of();
+ }
+ }
+
+ /**
+ * A composite transform that returns an output that is unbound.
+ */
+ private static class UnboundOutputCreator
+ extends PTransform<PCollection<Integer>, PDone> {
+
+ @Override
+ public PDone apply(PCollection<Integer> input) {
+ // Apply an operation so that this is a composite transform.
+ input.apply(Count.<Integer>perElement());
+
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+ }
+
+ /**
+ * A composite transform that returns a partially bound output.
+ *
+ * <p>This is not allowed and will result in a failure.
+ */
+ private static class PartiallyBoundOutputCreator
+ extends PTransform<PCollection<Integer>, PCollectionTuple> {
+
+ public final TupleTag<Integer> sumTag = new TupleTag<>("sum");
+ public final TupleTag<Void> doneTag = new TupleTag<>("done");
+
+ @Override
+ public PCollectionTuple apply(PCollection<Integer> input) {
+ PCollection<Integer> sum = input.apply(Sum.integersGlobally());
+
+ // Fails here when attempting to construct a tuple with an unbound object.
+ return PCollectionTuple.of(sumTag, sum)
+ .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded()));
+ }
+ }
+
+ @Test
+ public void testMultiGraphPipelineSerialization() throws IOException {
+ DataflowPipeline p = DataflowPipeline.create(buildPipelineOptions());
+
+ PCollection<Integer> input = p.begin()
+ .apply(Create.of(1, 2, 3));
+
+ input.apply(new UnrelatedOutputCreator());
+ input.apply(new UnboundOutputCreator());
+
+ DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ // Check that translation doesn't fail.
+ t.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+ }
+
+ @Test
+ public void testPartiallyBoundFailure() throws IOException {
+ Pipeline p = DataflowPipeline.create(buildPipelineOptions());
+
+ PCollection<Integer> input = p.begin()
+ .apply(Create.of(1, 2, 3));
+
+ thrown.expect(IllegalStateException.class);
+ input.apply(new PartiallyBoundOutputCreator());
+
+ Assert.fail("Failure expected from use of partially bound output");
+ }
+
+ /**
+ * This tests a few corner cases that should not crash.
+ */
+ @Test
+ public void testGoodWildcards() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+ applyRead(pipeline, "gs://bucket/foo");
+ applyRead(pipeline, "gs://bucket/foo/");
+ applyRead(pipeline, "gs://bucket/foo/*");
+ applyRead(pipeline, "gs://bucket/foo/?");
+ applyRead(pipeline, "gs://bucket/foo/[0-9]");
+ applyRead(pipeline, "gs://bucket/foo/*baz*");
+ applyRead(pipeline, "gs://bucket/foo/*baz?");
+ applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+ applyRead(pipeline, "gs://bucket/foo/baz/*");
+ applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+ applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+ applyRead(pipeline, "gs://bucket/foo*/baz");
+ applyRead(pipeline, "gs://bucket/foo?/baz");
+ applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+ // Check that translation doesn't fail.
+ t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+ }
+
+ private void applyRead(Pipeline pipeline, String path) {
+ pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+ }
+
+ /**
+ * Recursive wildcards are not supported.
+ * This tests "**".
+ */
+ @Test
+ public void testBadWildcardRecursive() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+ pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+
+ // Check that translation does fail.
+ thrown.expectCause(Matchers.allOf(
+ instanceOf(IllegalArgumentException.class),
+ ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
+ t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+ }
+
+ @Test
+ public void testToSingletonTranslation() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<T> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ pipeline.apply(Create.of(1))
+ .apply(View.<Integer>asSingleton());
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(2, steps.size());
+
+ Step createStep = steps.get(0);
+ assertEquals("CreateCollection", createStep.getKind());
+
+ Step collectionToSingletonStep = steps.get(1);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+
+ }
+
+ @Test
+ public void testToIterableTranslation() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<Iterable<T>> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ pipeline.apply(Create.of(1, 2, 3))
+ .apply(View.<Integer>asIterable());
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(2, steps.size());
+
+ Step createStep = steps.get(0);
+ assertEquals("CreateCollection", createStep.getKind());
+
+ Step collectionToSingletonStep = steps.get(1);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+ }
+
+ @Test
+ public void testToSingletonTranslationWithIsmSideInput() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<T> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ pipeline.apply(Create.of(1))
+ .apply(View.<Integer>asSingleton());
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(5, steps.size());
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> toIsmRecordOutputs =
+ (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+ assertTrue(
+ Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+ Step collectionToSingletonStep = steps.get(4);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+ }
+
+ @Test
+ public void testToIterableTranslationWithIsmSideInput() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<Iterable<T>> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+ pipeline.apply(Create.of(1, 2, 3))
+ .apply(View.<Integer>asIterable());
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(3, steps.size());
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> toIsmRecordOutputs =
+ (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+ assertTrue(
+ Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+
+ Step collectionToSingletonStep = steps.get(2);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+ }
+
+ @Test
+ public void testStepDisplayData() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+ DataflowPipeline pipeline = DataflowPipeline.create(options);
+
+ DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .add("foo", "bar")
+ .add("foo2", DataflowPipelineTranslatorTest.class)
+ .withLabel("Test Class")
+ .withLinkUrl("http://www.google.com");
+ }
+ };
+
+ DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo3", 1234);
+ }
+ };
+
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply(ParDo.of(fn1))
+ .apply(ParDo.of(fn2));
+
+ Job job = translator.translate(
+ pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+ List<Step> steps = job.getSteps();
+ assertEquals(3, steps.size());
+
+ Map<String, Object> parDo1Properties = steps.get(1).getProperties();
+ Map<String, Object> parDo2Properties = steps.get(2).getProperties();
+ assertThat(parDo1Properties, hasKey("display_data"));
+
+ Collection<Map<String, String>> fn1displayData =
+ (Collection<Map<String, String>>) parDo1Properties.get("display_data");
+ Collection<Map<String, String>> fn2displayData =
+ (Collection<Map<String, String>>) parDo2Properties.get("display_data");
+
+ ImmutableList expectedFn1DisplayData = ImmutableList.of(
+ ImmutableMap.<String, String>builder()
+ .put("namespace", fn1.getClass().getName())
+ .put("key", "foo")
+ .put("type", "STRING")
+ .put("value", "bar")
+ .build(),
+ ImmutableMap.<String, String>builder()
+ .put("namespace", fn1.getClass().getName())
+ .put("key", "foo2")
+ .put("type", "JAVA_CLASS")
+ .put("value", DataflowPipelineTranslatorTest.class.getName())
+ .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
+ .put("label", "Test Class")
+ .put("linkUrl", "http://www.google.com")
+ .build()
+ );
+
+ ImmutableList expectedFn2DisplayData = ImmutableList.of(
+ ImmutableMap.<String, Object>builder()
+ .put("namespace", fn2.getClass().getName())
+ .put("key", "foo3")
+ .put("type", "INTEGER")
+ .put("value", 1234L)
+ .build()
+ );
+
+ assertEquals(expectedFn1DisplayData, fn1displayData);
+ assertEquals(expectedFn2DisplayData, fn2displayData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
new file mode 100644
index 0000000..20d2bc8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.dataflow;
+import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Sample;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link CustomSources}.
+ */
+@RunWith(JUnit4.class)
+public class CustomSourcesTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class);
+
+ static class TestIO {
+ public static Read fromRange(int from, int to) {
+ return new Read(from, to, false);
+ }
+
+ static class Read extends BoundedSource<Integer> {
+ final int from;
+ final int to;
+ final boolean produceTimestamps;
+
+ Read(int from, int to, boolean produceTimestamps) {
+ this.from = from;
+ this.to = to;
+ this.produceTimestamps = produceTimestamps;
+ }
+
+ public Read withTimestampsMillis() {
+ return new Read(from, to, true);
+ }
+
+ @Override
+ public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
+ throws Exception {
+ List<Read> res = new ArrayList<>();
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers();
+ for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) {
+ res.add(new Read(
+ Math.round(from + i * step), Math.round(from + (i + 1) * step),
+ produceTimestamps));
+ }
+ return res;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 8 * (to - from);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return true;
+ }
+
+ @Override
+ public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+ return new RangeReader(this);
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public String toString() {
+ return "[" + from + ", " + to + ")";
+ }
+
+ @Override
+ public Coder<Integer> getDefaultOutputCoder() {
+ return BigEndianIntegerCoder.of();
+ }
+
+ private static class RangeReader extends BoundedReader<Integer> {
+ // To verify that BasicSerializableSourceFormat calls our methods according to protocol.
+ enum State {
+ UNSTARTED,
+ STARTED,
+ FINISHED
+ }
+ private Read source;
+ private int current = -1;
+ private State state = State.UNSTARTED;
+
+ public RangeReader(Read source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ Preconditions.checkState(state == State.UNSTARTED);
+ state = State.STARTED;
+ current = source.from;
+ return (current < source.to);
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ Preconditions.checkState(state == State.STARTED);
+ if (current == source.to - 1) {
+ state = State.FINISHED;
+ return false;
+ }
+ current++;
+ return true;
+ }
+
+ @Override
+ public Integer getCurrent() {
+ Preconditions.checkState(state == State.STARTED);
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() {
+ return source.produceTimestamps
+ ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Preconditions.checkState(state == State.STARTED || state == State.FINISHED);
+ state = State.FINISHED;
+ }
+
+ @Override
+ public Read getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public Read splitAtFraction(double fraction) {
+ int proposedIndex = (int) (source.from + fraction * (source.to - source.from));
+ if (proposedIndex <= current) {
+ return null;
+ }
+ Read primary = new Read(source.from, proposedIndex, source.produceTimestamps);
+ Read residual = new Read(proposedIndex, source.to, source.produceTimestamps);
+ this.source = primary;
+ return residual;
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ return (current == -1)
+ ? 0.0
+ : (1.0 * (1 + current - source.from) / (source.to - source.from));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testDirectPipelineWithoutTimestamps() throws Exception {
+ Pipeline p = TestPipeline.create();
+ PCollection<Integer> sum = p
+ .apply(Read.from(TestIO.fromRange(10, 20)))
+ .apply(Sum.integersGlobally())
+ .apply(Sample.<Integer>any(1));
+
+ PAssert.thatSingleton(sum).isEqualTo(145);
+ p.run();
+ }
+
+ @Test
+ public void testDirectPipelineWithTimestamps() throws Exception {
+ Pipeline p = TestPipeline.create();
+ PCollection<Integer> sums =
+ p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis()))
+ .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3))))
+ .apply(Sum.integersGlobally().withoutDefaults());
+ // Should group into [10 11] [12 13 14] [15 16 17] [18 19].
+ PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48);
+ p.run();
+ }
+
+ @Test
+ public void testRangeProgressAndSplitAtFraction() throws Exception {
+ // Show basic usage of getFractionConsumed and splitAtFraction.
+ // This test only tests TestIO itself, not BasicSerializableSourceFormat.
+
+ DataflowPipelineOptions options =
+ PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+ TestIO.Read source = TestIO.fromRange(10, 20);
+ try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
+ assertEquals(0, reader.getFractionConsumed().intValue());
+ assertTrue(reader.start());
+ assertEquals(0.1, reader.getFractionConsumed(), 1e-6);
+ assertTrue(reader.advance());
+ assertEquals(0.2, reader.getFractionConsumed(), 1e-6);
+ // Already past 0.0 and 0.1.
+ assertNull(reader.splitAtFraction(0.0));
+ assertNull(reader.splitAtFraction(0.1));
+
+ {
+ TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5);
+ assertNotNull(residual);
+ TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+ assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14));
+ assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19));
+ }
+
+ // Range is now [10, 15) and we are at 12.
+ {
+ TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14.
+ assertNotNull(residual);
+ TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+ assertThat(readFromSource(primary, options), contains(10, 11, 12, 13));
+ assertThat(readFromSource(residual, options), contains(14));
+ }
+
+ assertTrue(reader.advance());
+ assertEquals(12, reader.getCurrent().intValue());
+ assertTrue(reader.advance());
+ assertEquals(13, reader.getCurrent().intValue());
+ assertFalse(reader.advance());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
new file mode 100644
index 0000000..0a78a6d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.testing;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+import com.google.cloud.dataflow.sdk.util.TimeUtil;
+import com.google.cloud.dataflow.sdk.util.Transport;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link TestDataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowPipelineRunnerTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ @Mock private MockHttpTransport transport;
+ @Mock private MockLowLevelHttpRequest request;
+ @Mock private GcsUtil mockGcsUtil;
+
+ private TestDataflowPipelineOptions options;
+ private Dataflow service;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
+ doCallRealMethod().when(request).getContentAsString();
+ service = new Dataflow(transport, Transport.getJsonFactory(), null);
+
+ options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setAppName("TestAppName");
+ options.setProject("test-project");
+ options.setTempLocation("gs://test/temp/location");
+ options.setGcpCredential(new TestCredential());
+ options.setDataflowClient(service);
+ options.setRunner(TestDataflowPipelineRunner.class);
+ options.setPathValidatorClass(NoopPathValidator.class);
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("TestDataflowPipelineRunner#TestAppName",
+ new TestDataflowPipelineRunner(options).toString());
+ }
+
+ @Test
+ public void testRunBatchJobThatSucceeds() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ assertEquals(mockJob, runner.run(p, mockRunner));
+ }
+
+ @Test
+ public void testRunBatchJobThatFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testBatchPipelineFailsIfException() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testRunStreamingJobThatSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testRunStreamingJobThatFails() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ doReturn(State.DONE).when(job).getState();
+ assertEquals(Optional.of(true), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ doReturn(State.DONE).when(job).getState();
+ assertEquals(Optional.of(false), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, false /* tentative */));
+ doReturn(State.RUNNING).when(job).getState();
+ assertEquals(Optional.absent(), runner.checkForSuccess(job));
+ }
+
+ private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
+ throws Exception {
+ MetricStructuredName name = new MetricStructuredName();
+ name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+ name.setContext(
+ tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+ MetricUpdate metric = new MetricUpdate();
+ metric.setName(name);
+ metric.setScalar(BigDecimal.ONE);
+
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setContentType(Json.MEDIA_TYPE);
+ JobMetrics jobMetrics = new JobMetrics();
+ jobMetrics.setMetrics(Lists.newArrayList(metric));
+ // N.B. Setting the factory is necessary in order to get valid JSON.
+ jobMetrics.setFactory(Transport.getJsonFactory());
+ response.setContent(jobMetrics.toPrettyString());
+ return response;
+ }
+
+ @Test
+ public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, false /* tentative */));
+ doReturn(State.FAILED).when(job).getState();
+ assertEquals(Optional.of(false), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testStreamingPipelineFailsIfException() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..914a484
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ /**
+ * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+ * is not expanded. This is used for verifying that even without expansion the proper errors show
+ * up.
+ */
+ private Pipeline createTestServiceRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ @Test
+ public void testInvalidWindowsService() {
+ Pipeline p = createTestServiceRunner();
+
+ List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Window.<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("GroupByKey must have a valid Window merge function");
+ input
+ .apply("GroupByKey", GroupByKey.<String, Integer>create())
+ .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+ }
+
+ @Test
+ public void testGroupByKeyServiceUnbounded() {
+ Pipeline p = createTestServiceRunner();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED)
+ .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+ }
+ });
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+ input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
new file mode 100644
index 0000000..936b7c6
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowViewTest {
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ private Pipeline createTestBatchRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ private Pipeline createTestStreamingRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setStreaming(true);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ private void testViewUnbounded(
+ Pipeline pipeline,
+ PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to create a side-input view from input");
+ thrown.expectCause(
+ ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
+ pipeline
+ .apply(
+ new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED)
+ .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+ }
+ })
+ .apply(view);
+ }
+
+ private void testViewNonmerging(
+ Pipeline pipeline,
+ PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to create a side-input view from input");
+ thrown.expectCause(
+ ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
+ pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
+ .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
+ "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
+ .apply(view);
+ }
+
+ @Test
+ public void testViewUnboundedAsSingletonBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewUnboundedAsSingletonStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewUnboundedAsIterableBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewUnboundedAsIterableStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewUnboundedAsListBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewUnboundedAsListStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewUnboundedAsMapBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMapStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMultimapBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMultimapStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewNonmergingAsSingletonBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewNonmergingAsSingletonStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewNonmergingAsIterableBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewNonmergingAsIterableStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewNonmergingAsListBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewNonmergingAsListStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewNonmergingAsMapBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMapStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMultimapBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMultimapStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
new file mode 100644
index 0000000..7f35fd3
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DataflowPathValidator}. */
+@RunWith(JUnit4.class)
+public class DataflowPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private DataflowPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setGcsUtil(mockGcsUtil);
+ validator = new DataflowPathValidator(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
new file mode 100644
index 0000000..23e12de
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for MonitoringUtil.
+ */
+@RunWith(JUnit4.class)
+public class MonitoringUtilTest {
+ private static final String PROJECT_ID = "someProject";
+ private static final String JOB_ID = "1234";
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testGetJobMessages() throws IOException {
+ Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
+
+ // Two requests are needed to get all the messages.
+ Dataflow.Projects.Jobs.Messages.List firstRequest =
+ mock(Dataflow.Projects.Jobs.Messages.List.class);
+ Dataflow.Projects.Jobs.Messages.List secondRequest =
+ mock(Dataflow.Projects.Jobs.Messages.List.class);
+
+ when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+
+ ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
+ firstResponse.setJobMessages(new ArrayList<JobMessage>());
+ for (int i = 0; i < 100; ++i) {
+ JobMessage message = new JobMessage();
+ message.setId("message_" + i);
+ message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+ firstResponse.getJobMessages().add(message);
+ }
+ String pageToken = "page_token";
+ firstResponse.setNextPageToken(pageToken);
+
+ ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
+ secondResponse.setJobMessages(new ArrayList<JobMessage>());
+ for (int i = 100; i < 150; ++i) {
+ JobMessage message = new JobMessage();
+ message.setId("message_" + i);
+ message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+ secondResponse.getJobMessages().add(message);
+ }
+
+ when(firstRequest.execute()).thenReturn(firstResponse);
+ when(secondRequest.execute()).thenReturn(secondResponse);
+
+ MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+
+ List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
+
+ verify(secondRequest).setPageToken(pageToken);
+
+ assertEquals(150, messages.size());
+ }
+
+ @Test
+ public void testToStateCreatesState() {
+ String stateName = "JOB_STATE_DONE";
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.DONE, result);
+ }
+
+ @Test
+ public void testToStateWithNullReturnsUnknown() {
+ String stateName = null;
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.UNKNOWN, result);
+ }
+
+ @Test
+ public void testToStateWithOtherValueReturnsUnknown() {
+ String stateName = "FOO_BAR_BAZ";
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.UNKNOWN, result);
+ }
+
+ @Test
+ public void testDontOverrideEndpointWithDefaultApi() {
+ DataflowPipelineOptions options =
+ PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+ options.setProject(PROJECT_ID);
+ options.setGcpCredential(new TestCredential());
+ String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+ assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
+ }
+
+ @Test
+ public void testOverridesEndpointWithStagedDataflowEndpoint() {
+ DataflowPipelineOptions options =
+ PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+ options.setProject(PROJECT_ID);
+ options.setGcpCredential(new TestCredential());
+ String stagingDataflowEndpoint = "v0neverExisted";
+ options.setDataflowEndpoint(stagingDataflowEndpoint);
+ String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+ assertEquals(
+ "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
+ + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
+ cancelCommand);
+ }
+}