You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:44 UTC

[10/18] incubator-beam git commit: [BEAM-151] Move a large portion of the Dataflow runner to separate maven module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
new file mode 100644
index 0000000..1b32b73
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
+import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary;
+import static com.google.cloud.dataflow.sdk.util.Structs.getString;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.Step;
+import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.OutputReference;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.util.Structs;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for DataflowPipelineTranslator.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPipelineTranslatorTest implements Serializable {
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  // A Custom Mockito matcher for an initial Job that checks that all
+  // expected fields are set.
+  private static class IsValidCreateRequest extends ArgumentMatcher<Job> {
+    @Override
+    public boolean matches(Object o) {
+      Job job = (Job) o;
+      return job.getId() == null
+          && job.getProjectId() == null
+          && job.getName() != null
+          && job.getType() != null
+          && job.getEnvironment() != null
+          && job.getSteps() != null
+          && job.getCurrentState() == null
+          && job.getCurrentStateTime() == null
+          && job.getExecutionInfo() == null
+          && job.getCreateTime() == null;
+    }
+  }
+
+  private DataflowPipeline buildPipeline(DataflowPipelineOptions options) {
+    DataflowPipeline p = DataflowPipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
+        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+
+    return p;
+  }
+
+  private static Dataflow buildMockDataflow(
+      ArgumentMatcher<Job> jobMatcher) throws IOException {
+    Dataflow mockDataflowClient = mock(Dataflow.class);
+    Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
+    Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
+    Dataflow.Projects.Jobs.Create mockRequest = mock(
+        Dataflow.Projects.Jobs.Create.class);
+
+    when(mockDataflowClient.projects()).thenReturn(mockProjects);
+    when(mockProjects.jobs()).thenReturn(mockJobs);
+    when(mockJobs.create(eq("someProject"), argThat(jobMatcher)))
+        .thenReturn(mockRequest);
+
+    Job resultJob = new Job();
+    resultJob.setId("newid");
+    when(mockRequest.execute()).thenReturn(resultJob);
+    return mockDataflowClient;
+  }
+
+  private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
+    GcsUtil mockGcsUtil = mock(GcsUtil.class);
+    when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+      @Override
+      public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+      }
+    });
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setJobName("some-job-name");
+    options.setProject("some-project");
+    options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
+    options.setFilesToStage(new LinkedList<String>());
+    options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
+    options.setGcsUtil(mockGcsUtil);
+    return options;
+  }
+
+  @Test
+  public void testSettingOfSdkPipelineOptions() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowPipelineRunner.class);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    // Note that the contents of this materialized map may be changed by the act of reading an
+    // option, which will cause the default to get materialized whereas it would otherwise be
+    // left absent. It is permissible to simply alter this test to reflect current behavior.
+    Map<String, Object> settings = new HashMap<>();
+    settings.put("appName", "DataflowPipelineTranslatorTest");
+    settings.put("project", "some-project");
+    settings.put("pathValidatorClass", "com.google.cloud.dataflow.sdk.util.DataflowPathValidator");
+    settings.put("runner", "com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner");
+    settings.put("jobName", "some-job-name");
+    settings.put("tempLocation", "gs://somebucket/some/path");
+    settings.put("stagingLocation", "gs://somebucket/some/path/staging");
+    settings.put("stableUniqueNames", "WARNING");
+    settings.put("streaming", false);
+    settings.put("numberOfWorkerHarnessThreads", 0);
+    settings.put("experiments", null);
+
+    assertEquals(ImmutableMap.of("options", settings),
+        job.getEnvironment().getSdkPipelineOptions());
+  }
+
+  @Test
+  public void testNetworkConfig() throws IOException {
+    final String testNetwork = "test-network";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setNetwork(testNetwork);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testNetwork,
+        job.getEnvironment().getWorkerPools().get(0).getNetwork());
+  }
+
+  @Test
+  public void testNetworkConfigMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
+  }
+
+  @Test
+  public void testSubnetworkConfig() throws IOException {
+    final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSubnetwork(testSubnetwork);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testSubnetwork,
+        job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
+  public void testSubnetworkConfigMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
+  public void testScalingAlgorithmMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    // Autoscaling settings are always set.
+    assertNull(
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        0,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testScalingAlgorithmNone() throws IOException {
+    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling =
+        DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE;
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setAutoscalingAlgorithm(noScaling);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(
+        "AUTOSCALING_ALGORITHM_NONE",
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        0,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
+    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null;
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setMaxNumWorkers(42);
+    options.setAutoscalingAlgorithm(noScaling);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        42,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testZoneConfig() throws IOException {
+    final String testZone = "test-zone-1";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setZone(testZone);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testZone,
+        job.getEnvironment().getWorkerPools().get(0).getZone());
+  }
+
+  @Test
+  public void testWorkerMachineTypeConfig() throws IOException {
+    final String testMachineType = "test-machine-type";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setWorkerMachineType(testMachineType);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+
+    WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0);
+    assertEquals(testMachineType, workerPool.getMachineType());
+  }
+
+  @Test
+  public void testDiskSizeGbConfig() throws IOException {
+    final Integer diskSizeGb = 1234;
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setDiskSizeGb(diskSizeGb);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(diskSizeGb,
+        job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
+  }
+
+  @Test
+  public void testPredefinedAddStep() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    DataflowPipelineTranslator.registerTransformTranslator(
+        EmbeddedTransform.class, new EmbeddedTranslator());
+
+    // Create a predefined step using another pipeline
+    Step predefinedStep = createPredefinedStep();
+
+    // Create a pipeline that the predefined step will be embedded into
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+        .apply(ParDo.of(new NoOpFn()))
+        .apply(new EmbeddedTransform(predefinedStep.clone()))
+        .apply(ParDo.of(new NoOpFn()));
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(4, steps.size());
+
+    // The input to the embedded step should match the output of the step before
+    Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
+    Map<String, Object> step2In = getDictionary(
+        steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
+    assertEquals(step1Out, step2In);
+
+    // The output from the embedded step should match the input of the step after
+    Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
+    Map<String, Object> step3In = getDictionary(
+        steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
+    assertEquals(step2Out, step3In);
+
+    // The step should not have been modified other than remapping the input
+    Step predefinedStepClone = predefinedStep.clone();
+    Step embeddedStepClone = steps.get(2).clone();
+    predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+    embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+    assertEquals(predefinedStepClone, embeddedStepClone);
+  }
+
+  /**
+   * Construct a OutputReference for the output of the step.
+   */
+  private static OutputReference getOutputPortReference(Step step) throws Exception {
+    // TODO: This should be done via a Structs accessor.
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> output =
+        (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
+    String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME);
+    return new OutputReference(step.getName(), outputTagId);
+  }
+
+  /**
+   * Returns a Step for a DoFn by creating and translating a pipeline.
+   */
+  private static Step createPredefinedStep() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    String stepName = "DoFn1";
+    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+        .apply(ParDo.of(new NoOpFn()).named(stepName))
+        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    assertEquals(13, job.getSteps().size());
+    Step step = job.getSteps().get(1);
+    assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
+    return step;
+  }
+
+  private static class NoOpFn extends DoFn<String, String> {
+    @Override public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+
+  /**
+   * A placeholder transform that will be used to substitute a predefined Step.
+   */
+  private static class EmbeddedTransform
+      extends PTransform<PCollection<String>, PCollection<String>> {
+    private final Step step;
+
+    public EmbeddedTransform(Step step) {
+      this.step = step;
+    }
+
+    @Override
+    public PCollection<String> apply(PCollection<String> input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded());
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+
+  /**
+   * A TransformTranslator that adds the predefined Step using
+   * {@link TranslationContext#addStep} and remaps the input port reference.
+   */
+  private static class EmbeddedTranslator
+      implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+    @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
+      addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
+          context.asOutputReference(context.getInput(transform)));
+      context.addStep(transform, transform.step);
+    }
+  }
+
+  /**
+   * A composite transform that returns an output that is unrelated to
+   * the input.
+   */
+  private static class UnrelatedOutputCreator
+      extends PTransform<PCollection<Integer>, PCollection<Integer>> {
+
+    @Override
+    public PCollection<Integer> apply(PCollection<Integer> input) {
+      // Apply an operation so that this is a composite transform.
+      input.apply(Count.<Integer>perElement());
+
+      // Return a value unrelated to the input.
+      return input.getPipeline().apply(Create.of(1, 2, 3, 4));
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return VarIntCoder.of();
+    }
+  }
+
+  /**
+   * A composite transform that returns an output that is unbound.
+   */
+  private static class UnboundOutputCreator
+      extends PTransform<PCollection<Integer>, PDone> {
+
+    @Override
+    public PDone apply(PCollection<Integer> input) {
+      // Apply an operation so that this is a composite transform.
+      input.apply(Count.<Integer>perElement());
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+  }
+
+  /**
+   * A composite transform that returns a partially bound output.
+   *
+   * <p>This is not allowed and will result in a failure.
+   */
+  private static class PartiallyBoundOutputCreator
+      extends PTransform<PCollection<Integer>, PCollectionTuple> {
+
+    public final TupleTag<Integer> sumTag = new TupleTag<>("sum");
+    public final TupleTag<Void> doneTag = new TupleTag<>("done");
+
+    @Override
+    public PCollectionTuple apply(PCollection<Integer> input) {
+      PCollection<Integer> sum = input.apply(Sum.integersGlobally());
+
+      // Fails here when attempting to construct a tuple with an unbound object.
+      return PCollectionTuple.of(sumTag, sum)
+          .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
+              input.getPipeline(),
+              WindowingStrategy.globalDefault(),
+              input.isBounded()));
+    }
+  }
+
+  @Test
+  public void testMultiGraphPipelineSerialization() throws IOException {
+    DataflowPipeline p = DataflowPipeline.create(buildPipelineOptions());
+
+    PCollection<Integer> input = p.begin()
+        .apply(Create.of(1, 2, 3));
+
+    input.apply(new UnrelatedOutputCreator());
+    input.apply(new UnboundOutputCreator());
+
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(
+        PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    // Check that translation doesn't fail.
+    t.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+  }
+
+  @Test
+  public void testPartiallyBoundFailure() throws IOException {
+    Pipeline p = DataflowPipeline.create(buildPipelineOptions());
+
+    PCollection<Integer> input = p.begin()
+        .apply(Create.of(1, 2, 3));
+
+    thrown.expect(IllegalStateException.class);
+    input.apply(new PartiallyBoundOutputCreator());
+
+    Assert.fail("Failure expected from use of partially bound output");
+  }
+
+  /**
+   * This tests a few corner cases that should not crash.
+   */
+  @Test
+  public void testGoodWildcards() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+    applyRead(pipeline, "gs://bucket/foo");
+    applyRead(pipeline, "gs://bucket/foo/");
+    applyRead(pipeline, "gs://bucket/foo/*");
+    applyRead(pipeline, "gs://bucket/foo/?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]");
+    applyRead(pipeline, "gs://bucket/foo/*baz*");
+    applyRead(pipeline, "gs://bucket/foo/*baz?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+    applyRead(pipeline, "gs://bucket/foo/baz/*");
+    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+    applyRead(pipeline, "gs://bucket/foo*/baz");
+    applyRead(pipeline, "gs://bucket/foo?/baz");
+    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+    // Check that translation doesn't fail.
+    t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+  }
+
+  private void applyRead(Pipeline pipeline, String path) {
+    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+  }
+
+  /**
+   * Recursive wildcards are not supported.
+   * This tests "**".
+   */
+  @Test
+  public void testBadWildcardRecursive() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+    pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+
+    // Check that translation does fail.
+    thrown.expectCause(Matchers.allOf(
+        instanceOf(IllegalArgumentException.class),
+        ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
+    t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+  }
+
+  @Test
+  public void testToSingletonTranslation() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<T> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    pipeline.apply(Create.of(1))
+        .apply(View.<Integer>asSingleton());
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(2, steps.size());
+
+    Step createStep = steps.get(0);
+    assertEquals("CreateCollection", createStep.getKind());
+
+    Step collectionToSingletonStep = steps.get(1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+
+  }
+
+  @Test
+  public void testToIterableTranslation() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<Iterable<T>> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3))
+        .apply(View.<Integer>asIterable());
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(2, steps.size());
+
+    Step createStep = steps.get(0);
+    assertEquals("CreateCollection", createStep.getKind());
+
+    Step collectionToSingletonStep = steps.get(1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testToSingletonTranslationWithIsmSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<T> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    pipeline.apply(Create.of(1))
+        .apply(View.<Integer>asSingleton());
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(5, steps.size());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> toIsmRecordOutputs =
+        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(
+        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+    Step collectionToSingletonStep = steps.get(4);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testToIterableTranslationWithIsmSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<Iterable<T>> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3))
+        .apply(View.<Integer>asIterable());
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(3, steps.size());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> toIsmRecordOutputs =
+        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(
+        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+
+    Step collectionToSingletonStep = steps.get(2);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testStepDisplayData() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+
+    DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder
+                .add("foo", "bar")
+                .add("foo2", DataflowPipelineTranslatorTest.class)
+                .withLabel("Test Class")
+                .withLinkUrl("http://www.google.com");
+      }
+    };
+
+    DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add("foo3", "barge");
+      }
+    };
+
+    pipeline
+      .apply(Create.of(1, 2, 3))
+      .apply(ParDo.of(fn1))
+      .apply(ParDo.of(fn2));
+
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(3, steps.size());
+
+    Map<String, Object> parDo1Properties = steps.get(1).getProperties();
+    Map<String, Object> parDo2Properties = steps.get(2).getProperties();
+    assertThat(parDo1Properties, hasKey("display_data"));
+
+    Collection<Map<String, String>> fn1displayData =
+            (Collection<Map<String, String>>) parDo1Properties.get("display_data");
+    Collection<Map<String, String>> fn2displayData =
+            (Collection<Map<String, String>>) parDo2Properties.get("display_data");
+
+    ImmutableList expectedFn1DisplayData = ImmutableList.of(
+            ImmutableMap.<String, String>builder()
+              .put("namespace", fn1.getClass().getName())
+              .put("key", "foo")
+              .put("type", "STRING")
+              .put("value", "bar")
+              .build(),
+            ImmutableMap.<String, String>builder()
+              .put("namespace", fn1.getClass().getName())
+              .put("key", "foo2")
+              .put("type", "JAVA_CLASS")
+              .put("value", DataflowPipelineTranslatorTest.class.getName())
+              .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
+              .put("label", "Test Class")
+              .put("linkUrl", "http://www.google.com")
+              .build()
+    );
+
+    ImmutableList expectedFn2DisplayData = ImmutableList.of(
+            ImmutableMap.<String, String>builder()
+                    .put("namespace", fn2.getClass().getName())
+                    .put("key", "foo3")
+                    .put("type", "STRING")
+                    .put("value", "barge")
+                    .build()
+    );
+
+    assertEquals(expectedFn1DisplayData, fn1displayData);
+    assertEquals(expectedFn2DisplayData, fn2displayData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
new file mode 100644
index 0000000..20d2bc8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.dataflow;
+import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Sample;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link CustomSources}.
+ */
+@RunWith(JUnit4.class)
+public class CustomSourcesTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class);
+
+  static class TestIO {
+    public static Read fromRange(int from, int to) {
+      return new Read(from, to, false);
+    }
+
+    static class Read extends BoundedSource<Integer> {
+      final int from;
+      final int to;
+      final boolean produceTimestamps;
+
+      Read(int from, int to, boolean produceTimestamps) {
+        this.from = from;
+        this.to = to;
+        this.produceTimestamps = produceTimestamps;
+      }
+
+      public Read withTimestampsMillis() {
+        return new Read(from, to, true);
+      }
+
+      @Override
+      public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
+          throws Exception {
+        List<Read> res = new ArrayList<>();
+        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+        float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers();
+        for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) {
+          res.add(new Read(
+              Math.round(from + i * step), Math.round(from + (i + 1) * step),
+              produceTimestamps));
+        }
+        return res;
+      }
+
+      @Override
+      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        return 8 * (to - from);
+      }
+
+      @Override
+      public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+        return true;
+      }
+
+      @Override
+      public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+        return new RangeReader(this);
+      }
+
+      @Override
+      public void validate() {}
+
+      @Override
+      public String toString() {
+        return "[" + from + ", " + to + ")";
+      }
+
+      @Override
+      public Coder<Integer> getDefaultOutputCoder() {
+        return BigEndianIntegerCoder.of();
+      }
+
+      private static class RangeReader extends BoundedReader<Integer> {
+        // To verify that BasicSerializableSourceFormat calls our methods according to protocol.
+        enum State {
+          UNSTARTED,
+          STARTED,
+          FINISHED
+        }
+        private Read source;
+        private int current = -1;
+        private State state = State.UNSTARTED;
+
+        public RangeReader(Read source) {
+          this.source = source;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+          Preconditions.checkState(state == State.UNSTARTED);
+          state = State.STARTED;
+          current = source.from;
+          return (current < source.to);
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+          Preconditions.checkState(state == State.STARTED);
+          if (current == source.to - 1) {
+            state = State.FINISHED;
+            return false;
+          }
+          current++;
+          return true;
+        }
+
+        @Override
+        public Integer getCurrent() {
+          Preconditions.checkState(state == State.STARTED);
+          return current;
+        }
+
+        @Override
+        public Instant getCurrentTimestamp() {
+          return source.produceTimestamps
+              ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+
+        @Override
+        public void close() throws IOException {
+          Preconditions.checkState(state == State.STARTED || state == State.FINISHED);
+          state = State.FINISHED;
+        }
+
+        @Override
+        public Read getCurrentSource() {
+          return source;
+        }
+
+        @Override
+        public Read splitAtFraction(double fraction) {
+          int proposedIndex = (int) (source.from + fraction * (source.to - source.from));
+          if (proposedIndex <= current) {
+            return null;
+          }
+          Read primary = new Read(source.from, proposedIndex, source.produceTimestamps);
+          Read residual = new Read(proposedIndex, source.to, source.produceTimestamps);
+          this.source = primary;
+          return residual;
+        }
+
+        @Override
+        public Double getFractionConsumed() {
+          return (current == -1)
+              ? 0.0
+              : (1.0 * (1 + current - source.from) / (source.to - source.from));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testDirectPipelineWithoutTimestamps() throws Exception {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> sum = p
+        .apply(Read.from(TestIO.fromRange(10, 20)))
+        .apply(Sum.integersGlobally())
+        .apply(Sample.<Integer>any(1));
+
+    PAssert.thatSingleton(sum).isEqualTo(145);
+    p.run();
+  }
+
+  @Test
+  public void testDirectPipelineWithTimestamps() throws Exception {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> sums =
+        p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis()))
+         .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3))))
+         .apply(Sum.integersGlobally().withoutDefaults());
+    // Should group into [10 11] [12 13 14] [15 16 17] [18 19].
+    PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48);
+    p.run();
+  }
+
+  @Test
+  public void testRangeProgressAndSplitAtFraction() throws Exception {
+    // Show basic usage of getFractionConsumed and splitAtFraction.
+    // This test only tests TestIO itself, not BasicSerializableSourceFormat.
+
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    TestIO.Read source = TestIO.fromRange(10, 20);
+    try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
+      assertEquals(0, reader.getFractionConsumed().intValue());
+      assertTrue(reader.start());
+      assertEquals(0.1, reader.getFractionConsumed(), 1e-6);
+      assertTrue(reader.advance());
+      assertEquals(0.2, reader.getFractionConsumed(), 1e-6);
+      // Already past 0.0 and 0.1.
+      assertNull(reader.splitAtFraction(0.0));
+      assertNull(reader.splitAtFraction(0.1));
+
+      {
+        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5);
+        assertNotNull(residual);
+        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14));
+        assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19));
+      }
+
+      // Range is now [10, 15) and we are at 12.
+      {
+        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14.
+        assertNotNull(residual);
+        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13));
+        assertThat(readFromSource(residual, options), contains(14));
+      }
+
+      assertTrue(reader.advance());
+      assertEquals(12, reader.getCurrent().intValue());
+      assertTrue(reader.advance());
+      assertEquals(13, reader.getCurrent().intValue());
+      assertFalse(reader.advance());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
new file mode 100644
index 0000000..0a78a6d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.testing;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+import com.google.cloud.dataflow.sdk.util.TimeUtil;
+import com.google.cloud.dataflow.sdk.util.Transport;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link TestDataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowPipelineRunnerTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Mock private MockHttpTransport transport;
+  @Mock private MockLowLevelHttpRequest request;
+  @Mock private GcsUtil mockGcsUtil;
+
+  private TestDataflowPipelineOptions options;
+  private Dataflow service;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
+    doCallRealMethod().when(request).getContentAsString();
+    service = new Dataflow(transport, Transport.getJsonFactory(), null);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setAppName("TestAppName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setGcpCredential(new TestCredential());
+    options.setDataflowClient(service);
+    options.setRunner(TestDataflowPipelineRunner.class);
+    options.setPathValidatorClass(NoopPathValidator.class);
+  }
+
+  @Test
+  public void testToString() {
+    assertEquals("TestDataflowPipelineRunner#TestAppName",
+        new TestDataflowPipelineRunner(options).toString());
+  }
+
+  @Test
+  public void testRunBatchJobThatSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
+  @Test
+  public void testRunBatchJobThatFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testRunStreamingJobThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobThatFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(true), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkForSuccess(job));
+  }
+
+  private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
+      throws Exception {
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(BigDecimal.ONE);
+
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(Lists.newArrayList(metric));
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.FAILED).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfException() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..b0b011d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+   * is not expanded. This is used for verifying that even without expansion the proper errors show
+   * up.
+   */
+  private Pipeline createTestServiceRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  @Test
+  public void testInvalidWindowsService() {
+    Pipeline p = createTestServiceRunner();
+
+    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(Create.of(ungroupedPairs)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("GroupByKey must have a valid Window merge function");
+    input
+        .apply("GroupByKey", GroupByKey.<String, Integer>create())
+        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+  }
+
+  @Test
+  public void testGroupByKeyServiceUnbounded() {
+    Pipeline p = createTestServiceRunner();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+              }
+            });
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
new file mode 100644
index 0000000..c2a4273
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowViewTest {
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  private Pipeline createTestBatchRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private Pipeline createTestStreamingRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setStreaming(true);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private void testViewUnbounded(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
+    pipeline
+        .apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+              }
+            })
+        .apply(view);
+  }
+
+  private void testViewNonmerging(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
+    pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
+        .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
+            "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
+        .apply(view);
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsListBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsListStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsListBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsListStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidatorTest.java
new file mode 100644
index 0000000..6a3cce7
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DataflowPathValidator}. */
+@RunWith(JUnit4.class)
+public class DataflowPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private DataflowPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setGcsUtil(mockGcsUtil);
+    validator = new DataflowPathValidator(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/MonitoringUtilTest.java
new file mode 100644
index 0000000..23e12de
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/MonitoringUtilTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for MonitoringUtil.
+ */
+@RunWith(JUnit4.class)
+public class MonitoringUtilTest {
+  private static final String PROJECT_ID = "someProject";
+  private static final String JOB_ID = "1234";
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGetJobMessages() throws IOException {
+    Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
+
+    // Two requests are needed to get all the messages.
+    Dataflow.Projects.Jobs.Messages.List firstRequest =
+        mock(Dataflow.Projects.Jobs.Messages.List.class);
+    Dataflow.Projects.Jobs.Messages.List secondRequest =
+        mock(Dataflow.Projects.Jobs.Messages.List.class);
+
+    when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+
+    ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
+    firstResponse.setJobMessages(new ArrayList<JobMessage>());
+    for (int i = 0; i < 100; ++i) {
+      JobMessage message = new JobMessage();
+      message.setId("message_" + i);
+      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+      firstResponse.getJobMessages().add(message);
+    }
+    String pageToken = "page_token";
+    firstResponse.setNextPageToken(pageToken);
+
+    ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
+    secondResponse.setJobMessages(new ArrayList<JobMessage>());
+    for (int i = 100; i < 150; ++i) {
+      JobMessage message = new JobMessage();
+      message.setId("message_" + i);
+      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+      secondResponse.getJobMessages().add(message);
+    }
+
+    when(firstRequest.execute()).thenReturn(firstResponse);
+    when(secondRequest.execute()).thenReturn(secondResponse);
+
+    MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+
+    List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
+
+    verify(secondRequest).setPageToken(pageToken);
+
+    assertEquals(150, messages.size());
+  }
+
+  @Test
+  public void testToStateCreatesState() {
+    String stateName = "JOB_STATE_DONE";
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.DONE, result);
+  }
+
+  @Test
+  public void testToStateWithNullReturnsUnknown() {
+    String stateName = null;
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.UNKNOWN, result);
+  }
+
+  @Test
+  public void testToStateWithOtherValueReturnsUnknown() {
+    String stateName = "FOO_BAR_BAZ";
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.UNKNOWN, result);
+  }
+
+  @Test
+  public void testDontOverrideEndpointWithDefaultApi() {
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    options.setProject(PROJECT_ID);
+    options.setGcpCredential(new TestCredential());
+    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+    assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
+  }
+
+  @Test
+  public void testOverridesEndpointWithStagedDataflowEndpoint() {
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    options.setProject(PROJECT_ID);
+    options.setGcpCredential(new TestCredential());
+    String stagingDataflowEndpoint = "v0neverExisted";
+    options.setDataflowEndpoint(stagingDataflowEndpoint);
+    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+    assertEquals(
+        "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
+        + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
+        cancelCommand);
+  }
+}