You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/27 03:08:35 UTC

[03/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
deleted file mode 100644
index 27c0acc..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ /dev/null
@@ -1,965 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getString;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.OutputReference;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.Step;
-import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for DataflowPipelineTranslator.
- */
-@RunWith(JUnit4.class)
-public class DataflowPipelineTranslatorTest implements Serializable {
-
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  // A Custom Mockito matcher for an initial Job that checks that all
-  // expected fields are set.
-  private static class IsValidCreateRequest extends ArgumentMatcher<Job> {
-    @Override
-    public boolean matches(Object o) {
-      Job job = (Job) o;
-      return job.getId() == null
-          && job.getProjectId() == null
-          && job.getName() != null
-          && job.getType() != null
-          && job.getEnvironment() != null
-          && job.getSteps() != null
-          && job.getCurrentState() == null
-          && job.getCurrentStateTime() == null
-          && job.getExecutionInfo() == null
-          && job.getCreateTime() == null;
-    }
-  }
-
-  private Pipeline buildPipeline(DataflowPipelineOptions options) {
-    options.setRunner(DataflowPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
-
-    return p;
-  }
-
-  private static Dataflow buildMockDataflow(
-      ArgumentMatcher<Job> jobMatcher) throws IOException {
-    Dataflow mockDataflowClient = mock(Dataflow.class);
-    Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
-    Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
-    Dataflow.Projects.Jobs.Create mockRequest = mock(
-        Dataflow.Projects.Jobs.Create.class);
-
-    when(mockDataflowClient.projects()).thenReturn(mockProjects);
-    when(mockProjects.jobs()).thenReturn(mockJobs);
-    when(mockJobs.create(eq("someProject"), argThat(jobMatcher)))
-        .thenReturn(mockRequest);
-
-    Job resultJob = new Job();
-    resultJob.setId("newid");
-    when(mockRequest.execute()).thenReturn(resultJob);
-    return mockDataflowClient;
-  }
-
-  private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
-    GcsUtil mockGcsUtil = mock(GcsUtil.class);
-    when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
-      @Override
-      public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
-        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-      }
-    });
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
-    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
-
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setGcpCredential(new TestCredential());
-    options.setJobName("some-job-name");
-    options.setProject("some-project");
-    options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
-    options.setFilesToStage(new LinkedList<String>());
-    options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
-    options.setGcsUtil(mockGcsUtil);
-    return options;
-  }
-
-  @Test
-  public void testSettingOfSdkPipelineOptions() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setRunner(DataflowPipelineRunner.class);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    // Note that the contents of this materialized map may be changed by the act of reading an
-    // option, which will cause the default to get materialized whereas it would otherwise be
-    // left absent. It is permissible to simply alter this test to reflect current behavior.
-    Map<String, Object> settings = new HashMap<>();
-    settings.put("appName", "DataflowPipelineTranslatorTest");
-    settings.put("project", "some-project");
-    settings.put("pathValidatorClass", "org.apache.beam.sdk.util.DataflowPathValidator");
-    settings.put("runner", "org.apache.beam.sdk.runners.DataflowPipelineRunner");
-    settings.put("jobName", "some-job-name");
-    settings.put("tempLocation", "gs://somebucket/some/path");
-    settings.put("stagingLocation", "gs://somebucket/some/path/staging");
-    settings.put("stableUniqueNames", "WARNING");
-    settings.put("streaming", false);
-    settings.put("numberOfWorkerHarnessThreads", 0);
-    settings.put("experiments", null);
-
-    Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
-    assertThat(sdkPipelineOptions, hasKey("options"));
-    assertEquals(settings, sdkPipelineOptions.get("options"));
-  }
-
-  @Test
-  public void testNetworkConfig() throws IOException {
-    final String testNetwork = "test-network";
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setNetwork(testNetwork);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertEquals(testNetwork,
-        job.getEnvironment().getWorkerPools().get(0).getNetwork());
-  }
-
-  @Test
-  public void testNetworkConfigMissing() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
-  }
-
-  @Test
-  public void testSubnetworkConfig() throws IOException {
-    final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setSubnetwork(testSubnetwork);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertEquals(testSubnetwork,
-        job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
-  }
-
-  @Test
-  public void testSubnetworkConfigMissing() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
-  }
-
-  @Test
-  public void testScalingAlgorithmMissing() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    // Autoscaling settings are always set.
-    assertNull(
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getAlgorithm());
-    assertEquals(
-        0,
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getMaxNumWorkers()
-            .intValue());
-  }
-
-  @Test
-  public void testScalingAlgorithmNone() throws IOException {
-    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling =
-        DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE;
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setAutoscalingAlgorithm(noScaling);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertEquals(
-        "AUTOSCALING_ALGORITHM_NONE",
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getAlgorithm());
-    assertEquals(
-        0,
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getMaxNumWorkers()
-            .intValue());
-  }
-
-  @Test
-  public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
-    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null;
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setMaxNumWorkers(42);
-    options.setAutoscalingAlgorithm(noScaling);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertNull(
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getAlgorithm());
-    assertEquals(
-        42,
-        job
-            .getEnvironment()
-            .getWorkerPools()
-            .get(0)
-            .getAutoscalingSettings()
-            .getMaxNumWorkers()
-            .intValue());
-  }
-
-  @Test
-  public void testZoneConfig() throws IOException {
-    final String testZone = "test-zone-1";
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setZone(testZone);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertEquals(testZone,
-        job.getEnvironment().getWorkerPools().get(0).getZone());
-  }
-
-  @Test
-  public void testWorkerMachineTypeConfig() throws IOException {
-    final String testMachineType = "test-machine-type";
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setWorkerMachineType(testMachineType);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-
-    WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0);
-    assertEquals(testMachineType, workerPool.getMachineType());
-  }
-
-  @Test
-  public void testDiskSizeGbConfig() throws IOException {
-    final Integer diskSizeGb = 1234;
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setDiskSizeGb(diskSizeGb);
-
-    Pipeline p = buildPipeline(options);
-    p.traverseTopologically(new RecordingPipelineVisitor());
-    Job job =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(1, job.getEnvironment().getWorkerPools().size());
-    assertEquals(diskSizeGb,
-        job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
-  }
-
-  @Test
-  public void testPredefinedAddStep() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    DataflowPipelineTranslator.registerTransformTranslator(
-        EmbeddedTransform.class, new EmbeddedTranslator());
-
-    // Create a predefined step using another pipeline
-    Step predefinedStep = createPredefinedStep();
-
-    // Create a pipeline that the predefined step will be embedded into
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
-        .apply(ParDo.of(new NoOpFn()))
-        .apply(new EmbeddedTransform(predefinedStep.clone()))
-        .apply(ParDo.of(new NoOpFn()));
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(4, steps.size());
-
-    // The input to the embedded step should match the output of the step before
-    Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
-    Map<String, Object> step2In = getDictionary(
-        steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
-    assertEquals(step1Out, step2In);
-
-    // The output from the embedded step should match the input of the step after
-    Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
-    Map<String, Object> step3In = getDictionary(
-        steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
-    assertEquals(step2Out, step3In);
-
-    // The step should not have been modified other than remapping the input
-    Step predefinedStepClone = predefinedStep.clone();
-    Step embeddedStepClone = steps.get(2).clone();
-    predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
-    embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
-    assertEquals(predefinedStepClone, embeddedStepClone);
-  }
-
-  /**
-   * Construct a OutputReference for the output of the step.
-   */
-  private static OutputReference getOutputPortReference(Step step) throws Exception {
-    // TODO: This should be done via a Structs accessor.
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> output =
-        (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
-    String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME);
-    return new OutputReference(step.getName(), outputTagId);
-  }
-
-  /**
-   * Returns a Step for a DoFn by creating and translating a pipeline.
-   */
-  private static Step createPredefinedStep() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    Pipeline pipeline = Pipeline.create(options);
-    String stepName = "DoFn1";
-    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
-        .apply(ParDo.of(new NoOpFn()).named(stepName))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    assertEquals(13, job.getSteps().size());
-    Step step = job.getSteps().get(1);
-    assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
-    return step;
-  }
-
-  private static class NoOpFn extends DoFn<String, String> {
-    @Override public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element());
-    }
-  }
-
-  /**
-   * A placeholder transform that will be used to substitute a predefined Step.
-   */
-  private static class EmbeddedTransform
-      extends PTransform<PCollection<String>, PCollection<String>> {
-    private final Step step;
-
-    public EmbeddedTransform(Step step) {
-      this.step = step;
-    }
-
-    @Override
-    public PCollection<String> apply(PCollection<String> input) {
-      return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(),
-          WindowingStrategy.globalDefault(),
-          input.isBounded());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-  }
-
-  /**
-   * A TransformTranslator that adds the predefined Step using
-   * {@link TranslationContext#addStep} and remaps the input port reference.
-   */
-  private static class EmbeddedTranslator
-      implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
-    @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
-      addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
-          context.asOutputReference(context.getInput(transform)));
-      context.addStep(transform, transform.step);
-    }
-  }
-
-  /**
-   * A composite transform that returns an output that is unrelated to
-   * the input.
-   */
-  private static class UnrelatedOutputCreator
-      extends PTransform<PCollection<Integer>, PCollection<Integer>> {
-
-    @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
-      // Apply an operation so that this is a composite transform.
-      input.apply(Count.<Integer>perElement());
-
-      // Return a value unrelated to the input.
-      return input.getPipeline().apply(Create.of(1, 2, 3, 4));
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return VarIntCoder.of();
-    }
-  }
-
-  /**
-   * A composite transform that returns an output that is unbound.
-   */
-  private static class UnboundOutputCreator
-      extends PTransform<PCollection<Integer>, PDone> {
-
-    @Override
-    public PDone apply(PCollection<Integer> input) {
-      // Apply an operation so that this is a composite transform.
-      input.apply(Count.<Integer>perElement());
-
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
-  }
-
-  /**
-   * A composite transform that returns a partially bound output.
-   *
-   * <p>This is not allowed and will result in a failure.
-   */
-  private static class PartiallyBoundOutputCreator
-      extends PTransform<PCollection<Integer>, PCollectionTuple> {
-
-    public final TupleTag<Integer> sumTag = new TupleTag<>("sum");
-    public final TupleTag<Void> doneTag = new TupleTag<>("done");
-
-    @Override
-    public PCollectionTuple apply(PCollection<Integer> input) {
-      PCollection<Integer> sum = input.apply(Sum.integersGlobally());
-
-      // Fails here when attempting to construct a tuple with an unbound object.
-      return PCollectionTuple.of(sumTag, sum)
-          .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              input.isBounded()));
-    }
-  }
-
-  @Test
-  public void testMultiGraphPipelineSerialization() throws IOException {
-    Pipeline p = Pipeline.create(buildPipelineOptions());
-
-    PCollection<Integer> input = p.begin()
-        .apply(Create.of(1, 2, 3));
-
-    input.apply(new UnrelatedOutputCreator());
-    input.apply(new UnboundOutputCreator());
-
-    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(
-        PipelineOptionsFactory.as(DataflowPipelineOptions.class));
-
-    // Check that translation doesn't fail.
-    t.translate(
-        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
-  }
-
-  @Test
-  public void testPartiallyBoundFailure() throws IOException {
-    Pipeline p = Pipeline.create(buildPipelineOptions());
-
-    PCollection<Integer> input = p.begin()
-        .apply(Create.of(1, 2, 3));
-
-    thrown.expect(IllegalStateException.class);
-    input.apply(new PartiallyBoundOutputCreator());
-
-    Assert.fail("Failure expected from use of partially bound output");
-  }
-
-  /**
-   * This tests a few corner cases that should not crash.
-   */
-  @Test
-  public void testGoodWildcards() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline pipeline = Pipeline.create(options);
-    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
-
-    applyRead(pipeline, "gs://bucket/foo");
-    applyRead(pipeline, "gs://bucket/foo/");
-    applyRead(pipeline, "gs://bucket/foo/*");
-    applyRead(pipeline, "gs://bucket/foo/?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]");
-    applyRead(pipeline, "gs://bucket/foo/*baz*");
-    applyRead(pipeline, "gs://bucket/foo/*baz?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
-    applyRead(pipeline, "gs://bucket/foo/baz/*");
-    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
-    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
-    applyRead(pipeline, "gs://bucket/foo*/baz");
-    applyRead(pipeline, "gs://bucket/foo?/baz");
-    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
-    // Check that translation doesn't fail.
-    t.translate(
-        pipeline,
-        (DataflowPipelineRunner) pipeline.getRunner(),
-        Collections.<DataflowPackage>emptyList());
-  }
-
-  private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
-  }
-
-  /**
-   * Recursive wildcards are not supported.
-   * This tests "**".
-   */
-  @Test
-  public void testBadWildcardRecursive() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline pipeline = Pipeline.create(options);
-    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
-
-    pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
-
-    // Check that translation does fail.
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
-    t.translate(
-        pipeline,
-        (DataflowPipelineRunner) pipeline.getRunner(),
-        Collections.<DataflowPackage>emptyList());
-  }
-
-  @Test
-  public void testToSingletonTranslation() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<T> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1))
-        .apply(View.<Integer>asSingleton());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(2, steps.size());
-
-    Step createStep = steps.get(0);
-    assertEquals("ParallelRead", createStep.getKind());
-
-    Step collectionToSingletonStep = steps.get(1);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-
-  }
-
-  @Test
-  public void testToIterableTranslation() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<Iterable<T>> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1, 2, 3))
-        .apply(View.<Integer>asIterable());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(2, steps.size());
-
-    Step createStep = steps.get(0);
-    assertEquals("ParallelRead", createStep.getKind());
-
-    Step collectionToSingletonStep = steps.get(1);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-  }
-
-  @Test
-  public void testToSingletonTranslationWithIsmSideInput() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<T> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1))
-        .apply(View.<Integer>asSingleton());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(5, steps.size());
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
-    assertTrue(
-        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
-
-    Step collectionToSingletonStep = steps.get(4);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-  }
-
-  @Test
-  public void testToIterableTranslationWithIsmSideInput() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<Iterable<T>> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1, 2, 3))
-        .apply(View.<Integer>asIterable());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(3, steps.size());
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
-    assertTrue(
-        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
-
-
-    Step collectionToSingletonStep = steps.get(2);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-  }
-
-  @Test
-  public void testStepDisplayData() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    Pipeline pipeline = Pipeline.create(options);
-
-    DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element());
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder
-            .add("foo", "bar")
-            .add("foo2", DataflowPipelineTranslatorTest.class)
-                .withLabel("Test Class")
-                .withLinkUrl("http://www.google.com");
-      }
-    };
-
-    DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element());
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add("foo3", 1234);
-      }
-    };
-
-    ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
-    ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
-    pipeline
-      .apply(Create.of(1, 2, 3))
-      .apply(parDo1)
-      .apply(parDo2);
-
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    List<Step> steps = job.getSteps();
-    assertEquals(3, steps.size());
-
-    Map<String, Object> parDo1Properties = steps.get(1).getProperties();
-    Map<String, Object> parDo2Properties = steps.get(2).getProperties();
-    assertThat(parDo1Properties, hasKey("display_data"));
-
-    Collection<Map<String, String>> fn1displayData =
-            (Collection<Map<String, String>>) parDo1Properties.get("display_data");
-    Collection<Map<String, String>> fn2displayData =
-            (Collection<Map<String, String>>) parDo2Properties.get("display_data");
-
-    ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of(
-        ImmutableMap.<String, Object>builder()
-            .put("key", "foo")
-            .put("type", "STRING")
-            .put("value", "bar")
-            .put("namespace", fn1.getClass().getName())
-            .build(),
-        ImmutableMap.<String, Object>builder()
-            .put("key", "fn")
-            .put("type", "JAVA_CLASS")
-            .put("value", fn1.getClass().getName())
-            .put("shortValue", fn1.getClass().getSimpleName())
-            .put("namespace", parDo1.getClass().getName())
-            .build(),
-        ImmutableMap.<String, Object>builder()
-            .put("key", "foo2")
-            .put("type", "JAVA_CLASS")
-            .put("value", DataflowPipelineTranslatorTest.class.getName())
-            .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
-            .put("namespace", fn1.getClass().getName())
-            .put("label", "Test Class")
-            .put("linkUrl", "http://www.google.com")
-            .build()
-    );
-
-    ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
-        ImmutableMap.<String, Object>builder()
-            .put("key", "fn")
-            .put("type", "JAVA_CLASS")
-            .put("value", fn2.getClass().getName())
-            .put("shortValue", fn2.getClass().getSimpleName())
-            .put("namespace", parDo2.getClass().getName())
-            .build(),
-        ImmutableMap.<String, Object>builder()
-            .put("key", "foo3")
-            .put("type", "INTEGER")
-            .put("value", 1234L)
-            .put("namespace", fn2.getClass().getName())
-            .build()
-    );
-
-    assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
-    assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
deleted file mode 100644
index 2acede3..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.dataflow;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Sample;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for {@link CustomSources}.
- */
-@RunWith(JUnit4.class)
-public class CustomSourcesTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class);
-
-  static class TestIO {
-    public static Read fromRange(int from, int to) {
-      return new Read(from, to, false);
-    }
-
-    static class Read extends BoundedSource<Integer> {
-      final int from;
-      final int to;
-      final boolean produceTimestamps;
-
-      Read(int from, int to, boolean produceTimestamps) {
-        this.from = from;
-        this.to = to;
-        this.produceTimestamps = produceTimestamps;
-      }
-
-      public Read withTimestampsMillis() {
-        return new Read(from, to, true);
-      }
-
-      @Override
-      public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
-          throws Exception {
-        List<Read> res = new ArrayList<>();
-        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-        float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers();
-        for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) {
-          res.add(new Read(
-              Math.round(from + i * step), Math.round(from + (i + 1) * step),
-              produceTimestamps));
-        }
-        return res;
-      }
-
-      @Override
-      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-        return 8 * (to - from);
-      }
-
-      @Override
-      public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-        return true;
-      }
-
-      @Override
-      public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
-        return new RangeReader(this);
-      }
-
-      @Override
-      public void validate() {}
-
-      @Override
-      public String toString() {
-        return "[" + from + ", " + to + ")";
-      }
-
-      @Override
-      public Coder<Integer> getDefaultOutputCoder() {
-        return BigEndianIntegerCoder.of();
-      }
-
-      private static class RangeReader extends BoundedReader<Integer> {
-        // To verify that BasicSerializableSourceFormat calls our methods according to protocol.
-        enum State {
-          UNSTARTED,
-          STARTED,
-          FINISHED
-        }
-        private Read source;
-        private int current = -1;
-        private State state = State.UNSTARTED;
-
-        public RangeReader(Read source) {
-          this.source = source;
-        }
-
-        @Override
-        public boolean start() throws IOException {
-          Preconditions.checkState(state == State.UNSTARTED);
-          state = State.STARTED;
-          current = source.from;
-          return (current < source.to);
-        }
-
-        @Override
-        public boolean advance() throws IOException {
-          Preconditions.checkState(state == State.STARTED);
-          if (current == source.to - 1) {
-            state = State.FINISHED;
-            return false;
-          }
-          current++;
-          return true;
-        }
-
-        @Override
-        public Integer getCurrent() {
-          Preconditions.checkState(state == State.STARTED);
-          return current;
-        }
-
-        @Override
-        public Instant getCurrentTimestamp() {
-          return source.produceTimestamps
-              ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE;
-        }
-
-        @Override
-        public void close() throws IOException {
-          Preconditions.checkState(state == State.STARTED || state == State.FINISHED);
-          state = State.FINISHED;
-        }
-
-        @Override
-        public Read getCurrentSource() {
-          return source;
-        }
-
-        @Override
-        public Read splitAtFraction(double fraction) {
-          int proposedIndex = (int) (source.from + fraction * (source.to - source.from));
-          if (proposedIndex <= current) {
-            return null;
-          }
-          Read primary = new Read(source.from, proposedIndex, source.produceTimestamps);
-          Read residual = new Read(proposedIndex, source.to, source.produceTimestamps);
-          this.source = primary;
-          return residual;
-        }
-
-        @Override
-        public Double getFractionConsumed() {
-          return (current == -1)
-              ? 0.0
-              : (1.0 * (1 + current - source.from) / (source.to - source.from));
-        }
-      }
-    }
-  }
-
-  @Test
-  public void testDirectPipelineWithoutTimestamps() throws Exception {
-    Pipeline p = TestPipeline.create();
-    PCollection<Integer> sum = p
-        .apply(Read.from(TestIO.fromRange(10, 20)))
-        .apply(Sum.integersGlobally())
-        .apply(Sample.<Integer>any(1));
-
-    PAssert.thatSingleton(sum).isEqualTo(145);
-    p.run();
-  }
-
-  @Test
-  public void testDirectPipelineWithTimestamps() throws Exception {
-    Pipeline p = TestPipeline.create();
-    PCollection<Integer> sums =
-        p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis()))
-         .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3))))
-         .apply(Sum.integersGlobally().withoutDefaults());
-    // Should group into [10 11] [12 13 14] [15 16 17] [18 19].
-    PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48);
-    p.run();
-  }
-
-  @Test
-  public void testRangeProgressAndSplitAtFraction() throws Exception {
-    // Show basic usage of getFractionConsumed and splitAtFraction.
-    // This test only tests TestIO itself, not BasicSerializableSourceFormat.
-
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
-    TestIO.Read source = TestIO.fromRange(10, 20);
-    try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
-      assertEquals(0, reader.getFractionConsumed().intValue());
-      assertTrue(reader.start());
-      assertEquals(0.1, reader.getFractionConsumed(), 1e-6);
-      assertTrue(reader.advance());
-      assertEquals(0.2, reader.getFractionConsumed(), 1e-6);
-      // Already past 0.0 and 0.1.
-      assertNull(reader.splitAtFraction(0.0));
-      assertNull(reader.splitAtFraction(0.1));
-
-      {
-        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5);
-        assertNotNull(residual);
-        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
-        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14));
-        assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19));
-      }
-
-      // Range is now [10, 15) and we are at 12.
-      {
-        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14.
-        assertNotNull(residual);
-        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
-        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13));
-        assertThat(readFromSource(residual, options), contains(14));
-      }
-
-      assertTrue(reader.advance());
-      assertEquals(12, reader.getCurrent().intValue());
-      assertTrue(reader.advance());
-      assertEquals(13, reader.getCurrent().intValue());
-      assertFalse(reader.advance());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
deleted file mode 100644
index 185ab51..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineJob;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.MonitoringUtil;
-import org.apache.beam.sdk.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.TimeUtil;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.Json;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-/** Tests for {@link TestDataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowPipelineRunnerTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Mock private MockHttpTransport transport;
-  @Mock private MockLowLevelHttpRequest request;
-  @Mock private GcsUtil mockGcsUtil;
-
-  private TestDataflowPipelineOptions options;
-  private Dataflow service;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
-    doCallRealMethod().when(request).getContentAsString();
-    service = new Dataflow(transport, Transport.getJsonFactory(), null);
-
-    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setAppName("TestAppName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setTempRoot("gs://test");
-    options.setGcpCredential(new TestCredential());
-    options.setDataflowClient(service);
-    options.setRunner(TestDataflowPipelineRunner.class);
-    options.setPathValidatorClass(NoopPathValidator.class);
-  }
-
-  @Test
-  public void testToString() {
-    assertEquals("TestDataflowPipelineRunner#TestAppName",
-        new TestDataflowPipelineRunner(options).toString());
-  }
-
-  @Test
-  public void testRunBatchJobThatSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    assertEquals(mockJob, runner.run(p, mockRunner));
-  }
-
-  @Test
-  public void testRunBatchJobThatFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testBatchPipelineFailsIfException() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testRunStreamingJobThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testRunStreamingJobThatFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(true), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
-    doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkForSuccess(job));
-  }
-
-  private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
-      throws Exception {
-    MetricStructuredName name = new MetricStructuredName();
-    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
-    name.setContext(
-        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
-    MetricUpdate metric = new MetricUpdate();
-    metric.setName(name);
-    metric.setScalar(BigDecimal.ONE);
-
-    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
-    response.setContentType(Json.MEDIA_TYPE);
-    JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(Lists.newArrayList(metric));
-    // N.B. Setting the factory is necessary in order to get valid JSON.
-    jobMetrics.setFactory(Transport.getJsonFactory());
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
-  }
-
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
-    doReturn(State.FAILED).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testStreamingPipelineFailsIfException() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
deleted file mode 100644
index a381f68..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class DataflowGroupByKeyTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  /**
-   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
-   * is not expanded. This is used for verifying that even without expansion the proper errors show
-   * up.
-   */
-  private Pipeline createTestServiceRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
-  @Test
-  public void testInvalidWindowsService() {
-    Pipeline p = createTestServiceRunner();
-
-    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
-
-    PCollection<KV<String, Integer>> input =
-        p.apply(Create.of(ungroupedPairs)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            Sessions.withGapDuration(Duration.standardMinutes(1))));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("GroupByKey must have a valid Window merge function");
-    input
-        .apply("GroupByKey", GroupByKey.<String, Integer>create())
-        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
-  }
-
-  @Test
-  public void testGroupByKeyServiceUnbounded() {
-    Pipeline p = createTestServiceRunner();
-
-    PCollection<KV<String, Integer>> input =
-        p.apply(
-            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
-              @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
-              }
-            });
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
-        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
-
-    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
deleted file mode 100644
index b86de7e..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class DataflowViewTest {
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  private Pipeline createTestBatchRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
-  private Pipeline createTestStreamingRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setStreaming(true);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
-  private void testViewUnbounded(
-      Pipeline pipeline,
-      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Unable to create a side-input view from input");
-    thrown.expectCause(
-        ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
-    pipeline
-        .apply(
-            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
-              @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
-              }
-            })
-        .apply(view);
-  }
-
-  private void testViewNonmerging(
-      Pipeline pipeline,
-      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Unable to create a side-input view from input");
-    thrown.expectCause(
-        ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
-    pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
-        .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
-            "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
-        .apply(view);
-  }
-
-  @Test
-  public void testViewUnboundedAsSingletonBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewUnboundedAsSingletonStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewUnboundedAsIterableBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewUnboundedAsIterableStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewUnboundedAsListBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewUnboundedAsListStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewUnboundedAsMapBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewUnboundedAsMapStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewUnboundedAsMultimapBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
-  }
-
-  @Test
-  public void testViewUnboundedAsMultimapStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
-  }
-
-  @Test
-  public void testViewNonmergingAsSingletonBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewNonmergingAsSingletonStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewNonmergingAsIterableBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewNonmergingAsIterableStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewNonmergingAsListBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewNonmergingAsListStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewNonmergingAsMapBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewNonmergingAsMapStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewNonmergingAsMultimapBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
-  }
-
-  @Test
-  public void testViewNonmergingAsMultimapStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
deleted file mode 100644
index b459c47..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Mock private GcsUtil mockGcsUtil;
-  private DataflowPathValidator validator;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
-    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setGcsUtil(mockGcsUtil);
-    validator = new DataflowPathValidator(options);
-  }
-
-  @Test
-  public void testValidFilePattern() {
-    validator.validateInputFilePatternSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidFilePattern() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateInputFilePatternSupported("/local/path");
-  }
-
-  @Test
-  public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Could not find file gs://non-existent-bucket/location");
-    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
-  }
-
-  @Test
-  public void testValidOutputPrefix() {
-    validator.validateOutputFilePrefixSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidOutputPrefix() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateOutputFilePrefixSupported("/local/path");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
deleted file mode 100644
index bdc0bc3..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for MonitoringUtil.
- */
-@RunWith(JUnit4.class)
-public class MonitoringUtilTest {
-  private static final String PROJECT_ID = "someProject";
-  private static final String JOB_ID = "1234";
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testGetJobMessages() throws IOException {
-    Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
-
-    // Two requests are needed to get all the messages.
-    Dataflow.Projects.Jobs.Messages.List firstRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-    Dataflow.Projects.Jobs.Messages.List secondRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-
-    when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
-
-    ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
-    firstResponse.setJobMessages(new ArrayList<JobMessage>());
-    for (int i = 0; i < 100; ++i) {
-      JobMessage message = new JobMessage();
-      message.setId("message_" + i);
-      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
-      firstResponse.getJobMessages().add(message);
-    }
-    String pageToken = "page_token";
-    firstResponse.setNextPageToken(pageToken);
-
-    ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
-    secondResponse.setJobMessages(new ArrayList<JobMessage>());
-    for (int i = 100; i < 150; ++i) {
-      JobMessage message = new JobMessage();
-      message.setId("message_" + i);
-      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
-      secondResponse.getJobMessages().add(message);
-    }
-
-    when(firstRequest.execute()).thenReturn(firstResponse);
-    when(secondRequest.execute()).thenReturn(secondResponse);
-
-    MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
-
-    List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
-
-    verify(secondRequest).setPageToken(pageToken);
-
-    assertEquals(150, messages.size());
-  }
-
-  @Test
-  public void testToStateCreatesState() {
-    String stateName = "JOB_STATE_DONE";
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.DONE, result);
-  }
-
-  @Test
-  public void testToStateWithNullReturnsUnknown() {
-    String stateName = null;
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.UNKNOWN, result);
-  }
-
-  @Test
-  public void testToStateWithOtherValueReturnsUnknown() {
-    String stateName = "FOO_BAR_BAZ";
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.UNKNOWN, result);
-  }
-
-  @Test
-  public void testDontOverrideEndpointWithDefaultApi() {
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
-    options.setProject(PROJECT_ID);
-    options.setGcpCredential(new TestCredential());
-    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
-    assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
-  }
-
-  @Test
-  public void testOverridesEndpointWithStagedDataflowEndpoint() {
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
-    options.setProject(PROJECT_ID);
-    options.setGcpCredential(new TestCredential());
-    String stagingDataflowEndpoint = "v0neverExisted";
-    options.setDataflowEndpoint(stagingDataflowEndpoint);
-    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
-    assertEquals(
-        "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
-        + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
-        cancelCommand);
-  }
-}