You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:32 UTC

[39/50] [abbrv] incubator-beam git commit: Rename DataflowPipelineRunner to DataflowRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
deleted file mode 100644
index 38d4c96..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,1417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.startsWith;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsList;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMap;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMultimap;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsSingleton;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner.TransformedMap;
-import org.apache.beam.runners.dataflow.internal.IsmFormat;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroSource;
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matchers;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Tests for the {@link DataflowPipelineRunner}.
- */
-@RunWith(JUnit4.class)
-public class DataflowPipelineRunnerTest {
-
-  private static final String PROJECT_ID = "some-project";
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  // Asserts that the given Job has all expected fields set.
-  private static void assertValidJob(Job job) {
-    assertNull(job.getId());
-    assertNull(job.getCurrentState());
-    assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
-  }
-
-  private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
-    options.setStableUniqueNames(CheckEnabled.ERROR);
-    options.setRunner(DataflowPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
-
-    return p;
-  }
-
-  private static Dataflow buildMockDataflow(
-      final ArgumentCaptor<Job> jobCaptor) throws IOException {
-    Dataflow mockDataflowClient = mock(Dataflow.class);
-    Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
-    Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
-    Dataflow.Projects.Jobs.Create mockRequest =
-        mock(Dataflow.Projects.Jobs.Create.class);
-    Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class);
-
-    when(mockDataflowClient.projects()).thenReturn(mockProjects);
-    when(mockProjects.jobs()).thenReturn(mockJobs);
-    when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture()))
-        .thenReturn(mockRequest);
-    when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
-    when(mockList.setPageToken(anyString())).thenReturn(mockList);
-    when(mockList.execute())
-        .thenReturn(
-            new ListJobsResponse()
-                .setJobs(
-                    Arrays.asList(
-                        new Job()
-                            .setName("oldjobname")
-                            .setId("oldJobId")
-                            .setCurrentState("JOB_STATE_RUNNING"))));
-
-    Job resultJob = new Job();
-    resultJob.setId("newid");
-    when(mockRequest.execute()).thenReturn(resultJob);
-    return mockDataflowClient;
-  }
-
-  private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException {
-    GcsUtil mockGcsUtil = mock(GcsUtil.class);
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
-        .then(new Answer<SeekableByteChannel>() {
-              @Override
-              public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
-                return FileChannel.open(
-                    Files.createTempFile("channel-", ".tmp"),
-                    StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-              }
-            });
-
-    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
-    when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
-      @Override
-      public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
-        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-      }
-    });
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists);
-    return mockGcsUtil;
-  }
-
-  private DataflowPipelineOptions buildPipelineOptions() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    return buildPipelineOptions(jobCaptor);
-  }
-
-  private DataflowPipelineOptions buildPipelineOptions(
-      ArgumentCaptor<Job> jobCaptor) throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject(PROJECT_ID);
-    options.setTempLocation("gs://somebucket/some/path");
-    // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
-    options.setFilesToStage(new LinkedList<String>());
-    options.setDataflowClient(buildMockDataflow(jobCaptor));
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-    options.setGcpCredential(new TestCredential());
-    return options;
-  }
-
-  @Test
-  public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
-    String mixedCase = "ThisJobNameHasMixedCase";
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setJobName(mixedCase);
-
-    DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions(options);
-    assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
-  }
-
-  @Test
-  public void testRun() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    Pipeline p = buildDataflowPipeline(options);
-    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
-    assertEquals("newid", job.getJobId());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testRunReturnDifferentRequestId() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Dataflow mockDataflowClient = options.getDataflowClient();
-    Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
-    when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
-        .thenReturn(mockRequest);
-    Job resultJob = new Job();
-    resultJob.setId("newid");
-    // Return a different request id.
-    resultJob.setClientRequestId("different_request_id");
-    when(mockRequest.execute()).thenReturn(resultJob);
-
-    Pipeline p = buildDataflowPipeline(options);
-    try {
-      p.run();
-      fail("Expected DataflowJobAlreadyExistsException");
-    } catch (DataflowJobAlreadyExistsException expected) {
-      assertThat(expected.getMessage(),
-          containsString("If you want to submit a second job, try again by setting a "
-            + "different name using --jobName."));
-      assertEquals(expected.getJob().getJobId(), resultJob.getId());
-    }
-  }
-
-  @Test
-  public void testUpdate() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setUpdate(true);
-    options.setJobName("oldJobName");
-    Pipeline p = buildDataflowPipeline(options);
-    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
-    assertEquals("newid", job.getJobId());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testUpdateNonExistentPipeline() throws IOException {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Could not find running job named badjobname");
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setUpdate(true);
-    options.setJobName("badJobName");
-    Pipeline p = buildDataflowPipeline(options);
-    p.run();
-  }
-
-  @Test
-  public void testUpdateAlreadyUpdatedPipeline() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setUpdate(true);
-    options.setJobName("oldJobName");
-    Dataflow mockDataflowClient = options.getDataflowClient();
-    Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
-    when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
-        .thenReturn(mockRequest);
-    final Job resultJob = new Job();
-    resultJob.setId("newid");
-    // Return a different request id.
-    resultJob.setClientRequestId("different_request_id");
-    when(mockRequest.execute()).thenReturn(resultJob);
-
-    Pipeline p = buildDataflowPipeline(options);
-
-    thrown.expect(DataflowJobAlreadyUpdatedException.class);
-    thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
-      @Override
-      public void describeTo(Description description) {
-        description.appendText("Expected job ID: " + resultJob.getId());
-      }
-
-      @Override
-      protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) {
-        return resultJob.getId().equals(item.getJob().getJobId());
-      }
-    });
-    thrown.expectMessage("The job named oldjobname with id: oldJobId has already been updated "
-        + "into job id: newid and cannot be updated again.");
-    p.run();
-  }
-
-  @Test
-  public void testRunWithFiles() throws IOException {
-    // Test that the function DataflowPipelineRunner.stageFiles works as
-    // expected.
-    GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */);
-    final String gcsStaging = "gs://somebucket/some/path";
-    final String gcsTemp = "gs://somebucket/some/temp/path";
-    final String cloudDataflowDataset = "somedataset";
-
-    // Create some temporary files.
-    File temp1 = File.createTempFile("DataflowPipelineRunnerTest", "txt");
-    temp1.deleteOnExit();
-    File temp2 = File.createTempFile("DataflowPipelineRunnerTest2", "txt");
-    temp2.deleteOnExit();
-
-    String overridePackageName = "alias.txt";
-
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setFilesToStage(ImmutableList.of(
-        temp1.getAbsolutePath(),
-        overridePackageName + "=" + temp2.getAbsolutePath()));
-    options.setStagingLocation(gcsStaging);
-    options.setTempLocation(gcsTemp);
-    options.setTempDatasetId(cloudDataflowDataset);
-    options.setProject(PROJECT_ID);
-    options.setJobName("job");
-    options.setDataflowClient(buildMockDataflow(jobCaptor));
-    options.setGcsUtil(mockGcsUtil);
-    options.setGcpCredential(new TestCredential());
-
-    Pipeline p = buildDataflowPipeline(options);
-
-    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
-    assertEquals("newid", job.getJobId());
-
-    Job workflowJob = jobCaptor.getValue();
-    assertValidJob(workflowJob);
-
-    assertEquals(
-        2,
-        workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().size());
-    DataflowPackage workflowPackage1 =
-        workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(0);
-    assertThat(workflowPackage1.getName(), startsWith(temp1.getName()));
-    DataflowPackage workflowPackage2 =
-        workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(1);
-    assertEquals(overridePackageName, workflowPackage2.getName());
-
-    assertEquals(
-        "storage.googleapis.com/somebucket/some/temp/path",
-        workflowJob.getEnvironment().getTempStoragePrefix());
-    assertEquals(
-        cloudDataflowDataset,
-        workflowJob.getEnvironment().getDataset());
-    assertEquals(
-        ReleaseInfo.getReleaseInfo().getName(),
-        workflowJob.getEnvironment().getUserAgent().get("name"));
-    assertEquals(
-        ReleaseInfo.getReleaseInfo().getVersion(),
-        workflowJob.getEnvironment().getUserAgent().get("version"));
-  }
-
-  @Test
-  public void runWithDefaultFilesToStage() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setFilesToStage(null);
-    DataflowPipelineRunner.fromOptions(options);
-    assertTrue(!options.getFilesToStage().isEmpty());
-  }
-
-  @Test
-  public void detectClassPathResourceWithFileResources() throws Exception {
-    File file = tmpFolder.newFile("file");
-    File file2 = tmpFolder.newFile("file2");
-    URLClassLoader classLoader = new URLClassLoader(new URL[]{
-        file.toURI().toURL(),
-        file2.toURI().toURL()
-    });
-
-    assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()),
-        DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader));
-  }
-
-  @Test
-  public void detectClassPathResourcesWithUnsupportedClassLoader() {
-    ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unable to use ClassLoader to detect classpath elements.");
-
-    DataflowPipelineRunner.detectClassPathResourcesToStage(mockClassLoader);
-  }
-
-  @Test
-  public void detectClassPathResourceWithNonFileResources() throws Exception {
-    String url = "http://www.google.com/all-the-secrets.jar";
-    URLClassLoader classLoader = new URLClassLoader(new URL[]{
-        new URL(url)
-    });
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unable to convert url (" + url + ") to file.");
-
-    DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader);
-  }
-
-  @Test
-  public void testGcsStagingLocationInitialization() throws Exception {
-    // Test that the staging location is initialized correctly.
-    String gcsTemp = "gs://somebucket/some/temp/path";
-
-    // Set temp location (required), and check that staging location is set.
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setTempLocation(gcsTemp);
-    options.setProject(PROJECT_ID);
-    options.setGcpCredential(new TestCredential());
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-    options.setRunner(DataflowPipelineRunner.class);
-
-    DataflowPipelineRunner.fromOptions(options);
-
-    assertNotNull(options.getStagingLocation());
-  }
-
-  @Test
-  public void testNonGcsFilePathInReadFailure() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath()));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(
-            containsString("expected a valid 'gs://' path but was given"))));
-    p.run();
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testNonGcsFilePathInWriteFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
-    pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
-  }
-
-  @Test
-  public void testMultiSlashGcsFileReadPath() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadInvalidGcsFile")
-        .from("gs://bucket/tmp//file"));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes"))));
-    p.run();
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testMultiSlashGcsFileWritePath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("consecutive slashes");
-    pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
-  }
-
-  @Test
-  public void testInvalidTempLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setTempLocation("file://temp/location");
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
-    DataflowPipelineRunner.fromOptions(options);
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testInvalidStagingLocation() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setStagingLocation("file://my/staging/location");
-    try {
-      DataflowPipelineRunner.fromOptions(options);
-      fail("fromOptions should have failed");
-    } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
-    }
-    options.setStagingLocation("my/staging/location");
-    try {
-      DataflowPipelineRunner.fromOptions(options);
-      fail("fromOptions should have failed");
-    } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
-    }
-  }
-
-  @Test
-  public void testNonExistentTempLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setGcsUtil(mockGcsUtil);
-    options.setTempLocation("gs://non-existent-bucket/location");
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString(
-        "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
-    DataflowPipelineRunner.fromOptions(options);
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testNonExistentStagingLocation() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    options.setGcsUtil(mockGcsUtil);
-    options.setStagingLocation("gs://non-existent-bucket/location");
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString(
-        "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
-    DataflowPipelineRunner.fromOptions(options);
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testNoProjectFails() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-
-    options.setRunner(DataflowPipelineRunner.class);
-    // Explicitly set to null to prevent the default instance factory from reading credentials
-    // from a user's environment, causing this test to fail.
-    options.setProject(null);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Project id");
-    thrown.expectMessage("when running a Dataflow in the cloud");
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testProjectId() throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("foo-12345");
-
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-    options.setGcpCredential(new TestCredential());
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testProjectPrefix() throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("google.com:some-project-12345");
-
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-    options.setGcpCredential(new TestCredential());
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testProjectNumber() throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("12345");
-
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Project ID");
-    thrown.expectMessage("project number");
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testProjectDescription() throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("some project");
-
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Project ID");
-    thrown.expectMessage("project description");
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("foo-12345");
-
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-
-    options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Number of worker harness threads");
-    thrown.expectMessage("Please make sure the value is non-negative.");
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testNoStagingLocationAndNoTempLocationFails() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("foo-project");
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Missing required value: at least one of tempLocation or stagingLocation must be set.");
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testStagingLocationAndNoTempLocationSucceeds() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setGcpCredential(new TestCredential());
-    options.setProject("foo-project");
-    options.setStagingLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testTempLocationAndNoStagingLocationSucceeds() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setGcpCredential(new TestCredential());
-    options.setProject("foo-project");
-    options.setTempLocation("gs://spam/ham/eggs");
-    options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
-
-    DataflowPipelineRunner.fromOptions(options);
-  }
-
-  @Test
-  public void testInvalidJobName() throws IOException {
-    List<String> invalidNames = Arrays.asList(
-        "invalid_name",
-        "0invalid",
-        "invalid-");
-    List<String> expectedReason = Arrays.asList(
-        "JobName invalid",
-        "JobName invalid",
-        "JobName invalid");
-
-    for (int i = 0; i < invalidNames.size(); ++i) {
-      DataflowPipelineOptions options = buildPipelineOptions();
-      options.setJobName(invalidNames.get(i));
-
-      try {
-        DataflowPipelineRunner.fromOptions(options);
-        fail("Expected IllegalArgumentException for jobName "
-            + options.getJobName());
-      } catch (IllegalArgumentException e) {
-        assertThat(e.getMessage(),
-            containsString(expectedReason.get(i)));
-      }
-    }
-  }
-
-  @Test
-  public void testValidJobName() throws IOException {
-    List<String> names = Arrays.asList("ok", "Ok", "A-Ok", "ok-123",
-        "this-one-is-fairly-long-01234567890123456789");
-
-    for (String name : names) {
-      DataflowPipelineOptions options = buildPipelineOptions();
-      options.setJobName(name);
-
-      DataflowPipelineRunner runner = DataflowPipelineRunner
-          .fromOptions(options);
-      assertNotNull(runner);
-    }
-  }
-
-  /**
-   * A fake PTransform for testing.
-   */
-  public static class TestTransform
-      extends PTransform<PCollection<Integer>, PCollection<Integer>> {
-    public boolean translated = false;
-
-    @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
-      return PCollection.<Integer>createPrimitiveOutputInternal(
-          input.getPipeline(),
-          WindowingStrategy.globalDefault(),
-          input.isBounded());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
-      return input.getCoder();
-    }
-  }
-
-  @Test
-  public void testTransformTranslatorMissing() throws IOException {
-    // Test that we throw if we don't provide a translation.
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(Create.of(Arrays.asList(1, 2, 3)))
-     .apply(new TestTransform());
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(Matchers.containsString("no translator registered"));
-    DataflowPipelineTranslator.fromOptions(options)
-        .translate(
-            p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testTransformTranslator() throws IOException {
-    // Test that we can provide a custom translation
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline p = Pipeline.create(options);
-    TestTransform transform = new TestTransform();
-
-    p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
-        .apply(transform);
-
-    DataflowPipelineTranslator translator = DataflowPipelineRunner
-        .fromOptions(options).getTranslator();
-
-    DataflowPipelineTranslator.registerTransformTranslator(
-        TestTransform.class,
-        new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
-          @SuppressWarnings("unchecked")
-          @Override
-          public void translate(
-              TestTransform transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            transform.translated = true;
-
-            // Note: This is about the minimum needed to fake out a
-            // translation. This obviously isn't a real translation.
-            context.addStep(transform, "TestTranslate");
-            context.addOutput("output", context.getOutput(transform));
-          }
-        });
-
-    translator.translate(
-        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
-    assertTrue(transform.translated);
-  }
-
-  /** Records all the composite transforms visited within the Pipeline. */
-  private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
-    private List<PTransform<?, ?>> transforms = new ArrayList<>();
-
-    @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
-      if (node.getTransform() != null) {
-        transforms.add(node.getTransform());
-      }
-      return CompositeBehavior.ENTER_TRANSFORM;
-    }
-
-    public List<PTransform<?, ?>> getCompositeTransforms() {
-      return transforms;
-    }
-  }
-
-  @Test
-  public void testApplyIsScopedToExactClass() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline p = Pipeline.create(options);
-
-    Create.TimestampedValues<String> transform =
-        Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));
-    p.apply(transform);
-
-    CompositeTransformRecorder recorder = new CompositeTransformRecorder();
-    p.traverseTopologically(recorder);
-
-    // The recorder will also have seen a Create.Values composite as well, but we can't obtain that
-    // transform.
-    assertThat(
-        "Expected to have seen CreateTimestamped composite transform.",
-        recorder.getCompositeTransforms(),
-        hasItem(transform));
-    assertThat(
-        "Expected to have two composites, CreateTimestamped and Create.Values",
-        recorder.getCompositeTransforms(),
-        hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
-  }
-
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    assertEquals(
-        "DataflowPipelineRunner#testjobname",
-        DataflowPipelineRunner.fromOptions(options).toString());
-  }
-
-  private static PipelineOptions makeOptions(boolean streaming) {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setStreaming(streaming);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    return options;
-  }
-
-  private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming)
-      throws Exception {
-    String mode = streaming ? "streaming" : "batch";
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage(
-        "The DataflowPipelineRunner in " + mode + " mode does not support " + name);
-
-    Pipeline p = Pipeline.create(makeOptions(streaming));
-    p.apply(source);
-    p.run();
-  }
-
-  @Test
-  public void testBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
-  }
-
-  @Test
-  public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
-  }
-
-  @Test
-  public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        AvroIO.Read.from("foo"), "AvroIO.Read", true);
-  }
-
-  @Test
-  public void testTextIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
-  }
-
-  @Test
-  public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
-  }
-
-  @Test
-  public void testReadUnboundedUnsupportedInBatch() throws Exception {
-    testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
-  }
-
-  private void testUnsupportedSink(
-      PTransform<PCollection<String>, PDone> sink, String name, boolean streaming)
-          throws Exception {
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage(
-        "The DataflowPipelineRunner in streaming mode does not support " + name);
-
-    Pipeline p = Pipeline.create(makeOptions(streaming));
-    p.apply(Create.of("foo")).apply(sink);
-    p.run();
-  }
-
-  @Test
-  public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true);
-  }
-
-  @Test
-  public void testTextIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true);
-  }
-
-  @Test
-  public void testBatchViewAsSingletonToIsmRecord() throws Exception {
-    DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
-               IsmRecord<WindowedValue<String>>> doFnTester =
-               DoFnTester.of(
-                   new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
-                   <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
-
-    assertThat(
-        doFnTester.processBundle(
-            ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
-                0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
-        contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
-  }
-
-  @Test
-  public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
-      throws Exception {
-    DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
-    IsmRecord<WindowedValue<String>>> doFnTester =
-    DoFnTester.of(
-        new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
-        <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("found for singleton within window");
-    doFnTester.processBundle(ImmutableList.of(
-        KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0,
-            ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
-                KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
-  }
-
-  @Test
-  public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
-    DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester =
-        DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());
-
-    // The order of the output elements is important relative to processing order
-    assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains(
-        IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")),
-        IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")),
-        IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c"))));
-  }
-
-  @Test
-  public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception {
-    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>,
-               IsmRecord<WindowedValue<Long>>> doFnTester =
-        DoFnTester.of(
-            new BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<Long, IntervalWindow>(
-                IntervalWindow.getCoder()));
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of(
-                KV.of(
-                    windowA, WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
-                KV.of(
-                    windowA, WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
-                KV.of(
-                    windowA, WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
-                KV.of(
-                    windowB, WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
-                KV.of(
-                    windowB, WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING))
-                )),
-            KV.of(2, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of(
-                KV.of(
-                    windowC, WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))
-                )));
-
-    // The order of the output elements is important relative to processing order
-    assertThat(doFnTester.processBundle(inputElements), contains(
-        IsmRecord.of(ImmutableList.of(windowA, 0L),
-            WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(ImmutableList.of(windowA, 1L),
-            WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(ImmutableList.of(windowA, 2L),
-            WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(ImmutableList.of(windowB, 0L),
-            WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
-        IsmRecord.of(ImmutableList.of(windowB, 1L),
-            WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)),
-        IsmRecord.of(ImmutableList.of(windowC, 0L),
-            WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))));
-  }
-
-  @Test
-  public void testToIsmRecordForMapLikeDoFn() throws Exception {
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
-
-    Coder<Long> keyCoder = VarLongCoder.of();
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
-        1,
-        2,
-        ImmutableList.<Coder<?>>of(
-            MetadataKeyCoder.of(keyCoder),
-            IntervalWindow.getCoder(),
-            BigEndianLongCoder.of()),
-        FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
-
-    DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
-               IsmRecord<WindowedValue<Long>>> doFnTester =
-        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
-            outputForSizeTag,
-            outputForEntrySetTag,
-            windowCoder,
-            keyCoder,
-            ismCoder,
-            false /* unique keys */));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer,
-                Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
-                KV.of(KV.of(1L, windowA),
-                    WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
-                // same window same key as to previous
-                KV.of(KV.of(1L, windowA),
-                    WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
-                // same window different key as to previous
-                KV.of(KV.of(2L, windowA),
-                    WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
-                // different window same key as to previous
-                KV.of(KV.of(2L, windowB),
-                    WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
-                // different window and different key as to previous
-                KV.of(KV.of(3L, windowB),
-                    WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))),
-            KV.of(2, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
-                // different shard
-                KV.of(KV.of(4L, windowC),
-                    WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))));
-
-    // The order of the output elements is important relative to processing order
-    assertThat(doFnTester.processBundle(inputElements), contains(
-        IsmRecord.of(
-            ImmutableList.of(1L, windowA, 0L),
-            WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(
-            ImmutableList.of(1L, windowA, 1L),
-            WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(
-            ImmutableList.of(2L, windowA, 0L),
-            WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
-        IsmRecord.of(
-            ImmutableList.of(2L, windowB, 0L),
-            WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
-        IsmRecord.of(
-            ImmutableList.of(3L, windowB, 0L),
-            WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
-        IsmRecord.of(
-            ImmutableList.of(4L, windowC, 0L),
-            WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))));
-
-    // Verify the number of unique keys per window.
-    assertThat(doFnTester.takeSideOutputElements(outputForSizeTag), contains(
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
-            KV.of(windowA, 2L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
-            KV.of(windowB, 2L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
-            KV.of(windowC, 1L))
-        ));
-
-    // Verify the output for the unique keys.
-    assertThat(doFnTester.takeSideOutputElements(outputForEntrySetTag), contains(
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
-            KV.of(windowA, 1L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
-            KV.of(windowA, 2L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
-            KV.of(windowB, 2L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
-            KV.of(windowB, 3L)),
-        KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
-            KV.of(windowC, 4L))
-        ));
-  }
-
-  @Test
-  public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception {
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
-
-    Coder<Long> keyCoder = VarLongCoder.of();
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
-        1,
-        2,
-        ImmutableList.<Coder<?>>of(
-            MetadataKeyCoder.of(keyCoder),
-            IntervalWindow.getCoder(),
-            BigEndianLongCoder.of()),
-        FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
-
-    DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
-               IsmRecord<WindowedValue<Long>>> doFnTester =
-        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
-            outputForSizeTag,
-            outputForEntrySetTag,
-            windowCoder,
-            keyCoder,
-            ismCoder,
-            true /* unique keys */));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-
-    Iterable<KV<Integer,
-                Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
-                KV.of(KV.of(1L, windowA),
-                    WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
-                // same window same key as to previous
-                KV.of(KV.of(1L, windowA),
-                    WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)))));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Unique keys are expected but found key");
-    doFnTester.processBundle(inputElements);
-  }
-
-  @Test
-  public void testToIsmMetadataRecordForSizeDoFn() throws Exception {
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
-
-    Coder<Long> keyCoder = VarLongCoder.of();
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
-        1,
-        2,
-        ImmutableList.<Coder<?>>of(
-            MetadataKeyCoder.of(keyCoder),
-            IntervalWindow.getCoder(),
-            BigEndianLongCoder.of()),
-        FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
-
-    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>,
-               IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
-        new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<Long, Long, IntervalWindow>(
-            windowCoder));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1,
-                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
-                    KV.of(windowA, 2L),
-                    KV.of(windowA, 3L),
-                    KV.of(windowB, 7L))),
-            KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
-                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
-                    KV.of(windowC, 9L))));
-
-    // The order of the output elements is important relative to processing order
-    assertThat(doFnTester.processBundle(inputElements), contains(
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)),
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)),
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L))
-        ));
-  }
-
-  @Test
-  public void testToIsmMetadataRecordForKeyDoFn() throws Exception {
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
-    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
-
-    Coder<Long> keyCoder = VarLongCoder.of();
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
-        1,
-        2,
-        ImmutableList.<Coder<?>>of(
-            MetadataKeyCoder.of(keyCoder),
-            IntervalWindow.getCoder(),
-            BigEndianLongCoder.of()),
-        FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
-
-    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>,
-               IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
-        new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<Long, Long, IntervalWindow>(
-            keyCoder, windowCoder));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1,
-                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
-                    KV.of(windowA, 2L),
-                    // same window as previous
-                    KV.of(windowA, 3L),
-                    // different window as previous
-                    KV.of(windowB, 3L))),
-            KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
-                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
-                    KV.of(windowC, 3L))));
-
-    // The order of the output elements is important relative to processing order
-    assertThat(doFnTester.processBundle(inputElements), contains(
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)),
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
-        IsmRecord.<WindowedValue<Long>>meta(
-            ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L),
-            CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L))
-        ));
-  }
-
-  @Test
-  public void testToMapDoFn() throws Exception {
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
-                  IsmRecord<WindowedValue<TransformedMap<Long,
-                                                         WindowedValue<Long>,
-                                                         Long>>>> doFnTester =
-        DoFnTester.of(new BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(windowCoder));
-
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer,
-             Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1,
-                (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
-                    KV.of(windowA, WindowedValue.of(
-                        KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
-                    KV.of(windowA, WindowedValue.of(
-                        KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
-                    KV.of(windowB, WindowedValue.of(
-                        KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
-                    KV.of(windowB, WindowedValue.of(
-                        KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))),
-            KV.of(2,
-                (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
-                    KV.of(windowC, WindowedValue.of(
-                        KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING)))));
-
-    // The order of the output elements is important relative to processing order
-    List<IsmRecord<WindowedValue<TransformedMap<Long,
-                                                WindowedValue<Long>,
-                                                Long>>>> output =
-                                                doFnTester.processBundle(inputElements);
-    assertEquals(3, output.size());
-    Map<Long, Long> outputMap;
-
-    outputMap = output.get(0).getValue().getValue();
-    assertEquals(2, outputMap.size());
-    assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap);
-
-    outputMap = output.get(1).getValue().getValue();
-    assertEquals(2, outputMap.size());
-    assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap);
-
-    outputMap = output.get(2).getValue().getValue();
-    assertEquals(1, outputMap.size());
-    assertEquals(ImmutableMap.of(4L, 41L), outputMap);
-  }
-
-  @Test
-  public void testToMultimapDoFn() throws Exception {
-    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
-
-    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
-                  IsmRecord<WindowedValue<TransformedMap<Long,
-                                                         Iterable<WindowedValue<Long>>,
-                                                         Iterable<Long>>>>> doFnTester =
-        DoFnTester.of(
-            new BatchViewAsMultimap.ToMultimapDoFn<Long, Long, IntervalWindow>(windowCoder));
-
-
-    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
-    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
-    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
-
-    Iterable<KV<Integer,
-             Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements =
-        ImmutableList.of(
-            KV.of(1,
-                (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
-                    KV.of(windowA, WindowedValue.of(
-                        KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
-                    KV.of(windowA, WindowedValue.of(
-                        KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)),
-                    KV.of(windowA, WindowedValue.of(
-                        KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
-                    KV.of(windowB, WindowedValue.of(
-                        KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
-                    KV.of(windowB, WindowedValue.of(
-                        KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))),
-            KV.of(2,
-                (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
-                    KV.of(windowC, WindowedValue.of(
-                        KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING)))));
-
-    // The order of the output elements is important relative to processing order
-    List<IsmRecord<WindowedValue<TransformedMap<Long,
-                                                Iterable<WindowedValue<Long>>,
-                                                Iterable<Long>>>>> output =
-                                                doFnTester.processBundle(inputElements);
-    assertEquals(3, output.size());
-    Map<Long, Iterable<Long>> outputMap;
-
-    outputMap = output.get(0).getValue().getValue();
-    assertEquals(2, outputMap.size());
-    assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L));
-    assertThat(outputMap.get(2L), containsInAnyOrder(21L));
-
-    outputMap = output.get(1).getValue().getValue();
-    assertEquals(2, outputMap.size());
-    assertThat(outputMap.get(2L), containsInAnyOrder(21L));
-    assertThat(outputMap.get(3L), containsInAnyOrder(31L));
-
-    outputMap = output.get(2).getValue().getValue();
-    assertEquals(1, outputMap.size());
-    assertThat(outputMap.get(4L), containsInAnyOrder(41L));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 165d2b5..261ba99 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -124,7 +124,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   private Pipeline buildPipeline(DataflowPipelineOptions options) {
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
     p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
@@ -164,7 +164,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
 
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setJobName("some-job-name");
     options.setProject("some-project");
@@ -178,14 +178,14 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   @Test
   public void testSettingOfSdkPipelineOptions() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
 
     Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     // Note that the contents of this materialized map may be changed by the act of reading an
@@ -196,7 +196,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     settings.put("project", "some-project");
     settings.put("pathValidatorClass",
         "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
-    settings.put("runner", "org.apache.beam.runners.dataflow.DataflowPipelineRunner");
+    settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
     settings.put("jobName", "some-job-name");
     settings.put("tempLocation", "gs://somebucket/some/path");
     settings.put("stagingLocation", "gs://somebucket/some/path/staging");
@@ -222,7 +222,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -239,7 +239,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -258,7 +258,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -275,7 +275,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -291,7 +291,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -327,7 +327,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -362,7 +362,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -396,7 +396,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -416,7 +416,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -437,7 +437,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -466,7 +466,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -520,7 +520,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -655,7 +655,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     // Check that translation doesn't fail.
     t.translate(
-        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+        p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
   }
 
   @Test
@@ -698,7 +698,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     // Check that translation doesn't fail.
     t.translate(
         pipeline,
-        (DataflowPipelineRunner) pipeline.getRunner(),
+        (DataflowRunner) pipeline.getRunner(),
         Collections.<DataflowPackage>emptyList());
   }
 
@@ -724,7 +724,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
     t.translate(
         pipeline,
-        (DataflowPipelineRunner) pipeline.getRunner(),
+        (DataflowRunner) pipeline.getRunner(),
         Collections.<DataflowPackage>emptyList());
   }
 
@@ -745,7 +745,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -777,7 +777,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -807,7 +807,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -840,7 +840,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
@@ -903,7 +903,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowPipelineRunner) pipeline.getRunner(),
+                (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();