You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/17 22:13:49 UTC
[2/9] incubator-beam git commit: Rename DataflowPipelineRunner to
DataflowRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
new file mode 100644
index 0000000..e094d0d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -0,0 +1,1417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
+import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
+import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
+import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsSingleton;
+import org.apache.beam.runners.dataflow.DataflowRunner.TransformedMap;
+import org.apache.beam.runners.dataflow.internal.IsmFormat;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.AvroSource;
+import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for the {@link DataflowRunner}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowRunnerTest {
+
+ private static final String PROJECT_ID = "some-project";
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ // Asserts that the given Job has all expected fields set.
+ private static void assertValidJob(Job job) {
+ assertNull(job.getId());
+ assertNull(job.getCurrentState());
+ assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
+ }
+
+ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
+ options.setStableUniqueNames(CheckEnabled.ERROR);
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
+ .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+
+ return p;
+ }
+
+ private static Dataflow buildMockDataflow(
+ final ArgumentCaptor<Job> jobCaptor) throws IOException {
+ Dataflow mockDataflowClient = mock(Dataflow.class);
+ Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
+ Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
+ Dataflow.Projects.Jobs.Create mockRequest =
+ mock(Dataflow.Projects.Jobs.Create.class);
+ Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class);
+
+ when(mockDataflowClient.projects()).thenReturn(mockProjects);
+ when(mockProjects.jobs()).thenReturn(mockJobs);
+ when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture()))
+ .thenReturn(mockRequest);
+ when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
+ when(mockList.setPageToken(anyString())).thenReturn(mockList);
+ when(mockList.execute())
+ .thenReturn(
+ new ListJobsResponse()
+ .setJobs(
+ Arrays.asList(
+ new Job()
+ .setName("oldjobname")
+ .setId("oldJobId")
+ .setCurrentState("JOB_STATE_RUNNING"))));
+
+ Job resultJob = new Job();
+ resultJob.setId("newid");
+ when(mockRequest.execute()).thenReturn(resultJob);
+ return mockDataflowClient;
+ }
+
+ private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException {
+ GcsUtil mockGcsUtil = mock(GcsUtil.class);
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .then(new Answer<SeekableByteChannel>() {
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true);
+ when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists);
+ return mockGcsUtil;
+ }
+
+ private DataflowPipelineOptions buildPipelineOptions() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ return buildPipelineOptions(jobCaptor);
+ }
+
+ private DataflowPipelineOptions buildPipelineOptions(
+ ArgumentCaptor<Job> jobCaptor) throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject(PROJECT_ID);
+ options.setTempLocation("gs://somebucket/some/path");
+ // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
+ options.setFilesToStage(new LinkedList<String>());
+ options.setDataflowClient(buildMockDataflow(jobCaptor));
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpCredential(new TestCredential());
+ return options;
+ }
+
+ @Test
+ public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
+ String mixedCase = "ThisJobNameHasMixedCase";
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setJobName(mixedCase);
+
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ Pipeline p = buildDataflowPipeline(options);
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
+ assertEquals("newid", job.getJobId());
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testRunReturnDifferentRequestId() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ Dataflow mockDataflowClient = options.getDataflowClient();
+ Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
+ when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
+ .thenReturn(mockRequest);
+ Job resultJob = new Job();
+ resultJob.setId("newid");
+ // Return a different request id.
+ resultJob.setClientRequestId("different_request_id");
+ when(mockRequest.execute()).thenReturn(resultJob);
+
+ Pipeline p = buildDataflowPipeline(options);
+ try {
+ p.run();
+ fail("Expected DataflowJobAlreadyExistsException");
+ } catch (DataflowJobAlreadyExistsException expected) {
+ assertThat(expected.getMessage(),
+ containsString("If you want to submit a second job, try again by setting a "
+ + "different name using --jobName."));
+ assertEquals(expected.getJob().getJobId(), resultJob.getId());
+ }
+ }
+
+ @Test
+ public void testUpdate() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setUpdate(true);
+ options.setJobName("oldJobName");
+ Pipeline p = buildDataflowPipeline(options);
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
+ assertEquals("newid", job.getJobId());
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testUpdateNonExistentPipeline() throws IOException {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Could not find running job named badjobname");
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setUpdate(true);
+ options.setJobName("badJobName");
+ Pipeline p = buildDataflowPipeline(options);
+ p.run();
+ }
+
+ @Test
+ public void testUpdateAlreadyUpdatedPipeline() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setUpdate(true);
+ options.setJobName("oldJobName");
+ Dataflow mockDataflowClient = options.getDataflowClient();
+ Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
+ when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
+ .thenReturn(mockRequest);
+ final Job resultJob = new Job();
+ resultJob.setId("newid");
+ // Return a different request id.
+ resultJob.setClientRequestId("different_request_id");
+ when(mockRequest.execute()).thenReturn(resultJob);
+
+ Pipeline p = buildDataflowPipeline(options);
+
+ thrown.expect(DataflowJobAlreadyUpdatedException.class);
+ thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Expected job ID: " + resultJob.getId());
+ }
+
+ @Override
+ protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) {
+ return resultJob.getId().equals(item.getJob().getJobId());
+ }
+ });
+ thrown.expectMessage("The job named oldjobname with id: oldJobId has already been updated "
+ + "into job id: newid and cannot be updated again.");
+ p.run();
+ }
+
+ @Test
+ public void testRunWithFiles() throws IOException {
+ // Test that the function DataflowRunner.stageFiles works as
+ // expected.
+ GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */);
+ final String gcsStaging = "gs://somebucket/some/path";
+ final String gcsTemp = "gs://somebucket/some/temp/path";
+ final String cloudDataflowDataset = "somedataset";
+
+ // Create some temporary files.
+ File temp1 = File.createTempFile("DataflowRunnerTest", "txt");
+ temp1.deleteOnExit();
+ File temp2 = File.createTempFile("DataflowRunnerTest2", "txt");
+ temp2.deleteOnExit();
+
+ String overridePackageName = "alias.txt";
+
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setFilesToStage(ImmutableList.of(
+ temp1.getAbsolutePath(),
+ overridePackageName + "=" + temp2.getAbsolutePath()));
+ options.setStagingLocation(gcsStaging);
+ options.setTempLocation(gcsTemp);
+ options.setTempDatasetId(cloudDataflowDataset);
+ options.setProject(PROJECT_ID);
+ options.setJobName("job");
+ options.setDataflowClient(buildMockDataflow(jobCaptor));
+ options.setGcsUtil(mockGcsUtil);
+ options.setGcpCredential(new TestCredential());
+
+ Pipeline p = buildDataflowPipeline(options);
+
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
+ assertEquals("newid", job.getJobId());
+
+ Job workflowJob = jobCaptor.getValue();
+ assertValidJob(workflowJob);
+
+ assertEquals(
+ 2,
+ workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().size());
+ DataflowPackage workflowPackage1 =
+ workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(0);
+ assertThat(workflowPackage1.getName(), startsWith(temp1.getName()));
+ DataflowPackage workflowPackage2 =
+ workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(1);
+ assertEquals(overridePackageName, workflowPackage2.getName());
+
+ assertEquals(
+ "storage.googleapis.com/somebucket/some/temp/path",
+ workflowJob.getEnvironment().getTempStoragePrefix());
+ assertEquals(
+ cloudDataflowDataset,
+ workflowJob.getEnvironment().getDataset());
+ assertEquals(
+ ReleaseInfo.getReleaseInfo().getName(),
+ workflowJob.getEnvironment().getUserAgent().get("name"));
+ assertEquals(
+ ReleaseInfo.getReleaseInfo().getVersion(),
+ workflowJob.getEnvironment().getUserAgent().get("version"));
+ }
+
+ @Test
+ public void runWithDefaultFilesToStage() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setFilesToStage(null);
+ DataflowRunner.fromOptions(options);
+ assertTrue(!options.getFilesToStage().isEmpty());
+ }
+
+ @Test
+ public void detectClassPathResourceWithFileResources() throws Exception {
+ File file = tmpFolder.newFile("file");
+ File file2 = tmpFolder.newFile("file2");
+ URLClassLoader classLoader = new URLClassLoader(new URL[]{
+ file.toURI().toURL(),
+ file2.toURI().toURL()
+ });
+
+ assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()),
+ DataflowRunner.detectClassPathResourcesToStage(classLoader));
+ }
+
+ @Test
+ public void detectClassPathResourcesWithUnsupportedClassLoader() {
+ ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unable to use ClassLoader to detect classpath elements.");
+
+ DataflowRunner.detectClassPathResourcesToStage(mockClassLoader);
+ }
+
+ @Test
+ public void detectClassPathResourceWithNonFileResources() throws Exception {
+ String url = "http://www.google.com/all-the-secrets.jar";
+ URLClassLoader classLoader = new URLClassLoader(new URL[]{
+ new URL(url)
+ });
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unable to convert url (" + url + ") to file.");
+
+ DataflowRunner.detectClassPathResourcesToStage(classLoader);
+ }
+
+ @Test
+ public void testGcsStagingLocationInitialization() throws Exception {
+ // Test that the staging location is initialized correctly.
+ String gcsTemp = "gs://somebucket/some/temp/path";
+
+ // Set temp location (required), and check that staging location is set.
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setTempLocation(gcsTemp);
+ options.setProject(PROJECT_ID);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setRunner(DataflowRunner.class);
+
+ DataflowRunner.fromOptions(options);
+
+ assertNotNull(options.getStagingLocation());
+ }
+
+ @Test
+ public void testNonGcsFilePathInReadFailure() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+ p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath()));
+
+ thrown.expectCause(Matchers.allOf(
+ instanceOf(IllegalArgumentException.class),
+ ThrowableMessageMatcher.hasMessage(
+ containsString("expected a valid 'gs://' path but was given"))));
+ p.run();
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testNonGcsFilePathInWriteFailure() throws IOException {
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+ PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
+ pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+ }
+
+ @Test
+ public void testMultiSlashGcsFileReadPath() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+ p.apply(TextIO.Read.named("ReadInvalidGcsFile")
+ .from("gs://bucket/tmp//file"));
+
+ thrown.expectCause(Matchers.allOf(
+ instanceOf(IllegalArgumentException.class),
+ ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes"))));
+ p.run();
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testMultiSlashGcsFileWritePath() throws IOException {
+ Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+ PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("consecutive slashes");
+ pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
+ }
+
+ @Test
+ public void testInvalidTempLocation() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setTempLocation("file://temp/location");
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
+ DataflowRunner.fromOptions(options);
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testInvalidStagingLocation() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setStagingLocation("file://my/staging/location");
+ try {
+ DataflowRunner.fromOptions(options);
+ fail("fromOptions should have failed");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+ }
+ options.setStagingLocation("my/staging/location");
+ try {
+ DataflowRunner.fromOptions(options);
+ fail("fromOptions should have failed");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+ }
+ }
+
+ @Test
+ public void testNonExistentTempLocation() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setGcsUtil(mockGcsUtil);
+ options.setTempLocation("gs://non-existent-bucket/location");
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString(
+ "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+ DataflowRunner.fromOptions(options);
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testNonExistentStagingLocation() throws IOException {
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */);
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setGcsUtil(mockGcsUtil);
+ options.setStagingLocation("gs://non-existent-bucket/location");
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString(
+ "Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
+ DataflowRunner.fromOptions(options);
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testNoProjectFails() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+
+ options.setRunner(DataflowRunner.class);
+ // Explicitly set to null to prevent the default instance factory from reading credentials
+ // from a user's environment, causing this test to fail.
+ options.setProject(null);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Project id");
+ thrown.expectMessage("when running a Dataflow in the cloud");
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testProjectId() throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("foo-12345");
+
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpCredential(new TestCredential());
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testProjectPrefix() throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("google.com:some-project-12345");
+
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setGcpCredential(new TestCredential());
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testProjectNumber() throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("12345");
+
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Project ID");
+ thrown.expectMessage("project number");
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testProjectDescription() throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("some project");
+
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Project ID");
+ thrown.expectMessage("project description");
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("foo-12345");
+
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+
+ options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Number of worker harness threads");
+ thrown.expectMessage("Please make sure the value is non-negative.");
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testNoStagingLocationAndNoTempLocationFails() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setProject("foo-project");
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Missing required value: at least one of tempLocation or stagingLocation must be set.");
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testStagingLocationAndNoTempLocationSucceeds() throws Exception {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setGcpCredential(new TestCredential());
+ options.setProject("foo-project");
+ options.setStagingLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testTempLocationAndNoStagingLocationSucceeds() throws Exception {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setGcpCredential(new TestCredential());
+ options.setProject("foo-project");
+ options.setTempLocation("gs://spam/ham/eggs");
+ options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+
+ DataflowRunner.fromOptions(options);
+ }
+
+ @Test
+ public void testInvalidJobName() throws IOException {
+ List<String> invalidNames = Arrays.asList(
+ "invalid_name",
+ "0invalid",
+ "invalid-");
+ List<String> expectedReason = Arrays.asList(
+ "JobName invalid",
+ "JobName invalid",
+ "JobName invalid");
+
+ for (int i = 0; i < invalidNames.size(); ++i) {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setJobName(invalidNames.get(i));
+
+ try {
+ DataflowRunner.fromOptions(options);
+ fail("Expected IllegalArgumentException for jobName "
+ + options.getJobName());
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(),
+ containsString(expectedReason.get(i)));
+ }
+ }
+ }
+
+ @Test
+ public void testValidJobName() throws IOException {
+ List<String> names = Arrays.asList("ok", "Ok", "A-Ok", "ok-123",
+ "this-one-is-fairly-long-01234567890123456789");
+
+ for (String name : names) {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setJobName(name);
+
+ DataflowRunner runner = DataflowRunner
+ .fromOptions(options);
+ assertNotNull(runner);
+ }
+ }
+
+ /**
+ * A fake PTransform for testing.
+ */
+ public static class TestTransform
+ extends PTransform<PCollection<Integer>, PCollection<Integer>> {
+ public boolean translated = false;
+
+ @Override
+ public PCollection<Integer> apply(PCollection<Integer> input) {
+ return PCollection.<Integer>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded());
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
+ return input.getCoder();
+ }
+ }
+
+ @Test
+ public void testTransformTranslatorMissing() throws IOException {
+ // Test that we throw if we don't provide a translation.
+ ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+ DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(Create.of(Arrays.asList(1, 2, 3)))
+ .apply(new TestTransform());
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(Matchers.containsString("no translator registered"));
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(
+ p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ assertValidJob(jobCaptor.getValue());
+ }
+
+ @Test
+ public void testTransformTranslator() throws IOException {
+ // Test that we can provide a custom translation
+ DataflowPipelineOptions options = buildPipelineOptions();
+ Pipeline p = Pipeline.create(options);
+ TestTransform transform = new TestTransform();
+
+ p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
+ .apply(transform);
+
+ DataflowPipelineTranslator translator = DataflowRunner
+ .fromOptions(options).getTranslator();
+
+ DataflowPipelineTranslator.registerTransformTranslator(
+ TestTransform.class,
+ new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void translate(
+ TestTransform transform,
+ DataflowPipelineTranslator.TranslationContext context) {
+ transform.translated = true;
+
+ // Note: This is about the minimum needed to fake out a
+ // translation. This obviously isn't a real translation.
+ context.addStep(transform, "TestTranslate");
+ context.addOutput("output", context.getOutput(transform));
+ }
+ });
+
+ translator.translate(
+ p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ assertTrue(transform.translated);
+ }
+
+ /** Records all the composite transforms visited within the Pipeline. */
+ private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
+ private List<PTransform<?, ?>> transforms = new ArrayList<>();
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ if (node.getTransform() != null) {
+ transforms.add(node.getTransform());
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ public List<PTransform<?, ?>> getCompositeTransforms() {
+ return transforms;
+ }
+ }
+
+ @Test
+ public void testApplyIsScopedToExactClass() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ Pipeline p = Pipeline.create(options);
+
+ Create.TimestampedValues<String> transform =
+ Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));
+ p.apply(transform);
+
+ CompositeTransformRecorder recorder = new CompositeTransformRecorder();
+ p.traverseTopologically(recorder);
+
+ // The recorder will also have seen a Create.Values composite as well, but we can't obtain that
+ // transform.
+ assertThat(
+ "Expected to have seen CreateTimestamped composite transform.",
+ recorder.getCompositeTransforms(),
+ hasItem(transform));
+ assertThat(
+ "Expected to have two composites, CreateTimestamped and Create.Values",
+ recorder.getCompositeTransforms(),
+ hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
+ }
+
+ @Test
+ public void testToString() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setJobName("TestJobName");
+ options.setProject("test-project");
+ options.setTempLocation("gs://test/temp/location");
+ options.setGcpCredential(new TestCredential());
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setRunner(DataflowRunner.class);
+ assertEquals(
+ "DataflowRunner#testjobname",
+ DataflowRunner.fromOptions(options).toString());
+ }
+
+ private static PipelineOptions makeOptions(boolean streaming) {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setStreaming(streaming);
+ options.setJobName("TestJobName");
+ options.setProject("test-project");
+ options.setTempLocation("gs://test/temp/location");
+ options.setGcpCredential(new TestCredential());
+ options.setPathValidatorClass(NoopPathValidator.class);
+ return options;
+ }
+
+ private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming)
+ throws Exception {
+ String mode = streaming ? "streaming" : "batch";
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage(
+ "The DataflowRunner in " + mode + " mode does not support " + name);
+
+ Pipeline p = Pipeline.create(makeOptions(streaming));
+ p.apply(source);
+ p.run();
+ }
+
+ @Test
+ public void testBoundedSourceUnsupportedInStreaming() throws Exception {
+ testUnsupportedSource(
+ AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
+ }
+
+ @Test
+ public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
+ testUnsupportedSource(
+ BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
+ }
+
+ @Test
+ public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
+ testUnsupportedSource(
+ AvroIO.Read.from("foo"), "AvroIO.Read", true);
+ }
+
+ @Test
+ public void testTextIOSourceUnsupportedInStreaming() throws Exception {
+ testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
+ }
+
+ @Test
+ public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
+ testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
+ }
+
+ @Test
+ public void testReadUnboundedUnsupportedInBatch() throws Exception {
+ testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
+ }
+
+ private void testUnsupportedSink(
+ PTransform<PCollection<String>, PDone> sink, String name, boolean streaming)
+ throws Exception {
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage(
+ "The DataflowRunner in streaming mode does not support " + name);
+
+ Pipeline p = Pipeline.create(makeOptions(streaming));
+ p.apply(Create.of("foo")).apply(sink);
+ p.run();
+ }
+
+ @Test
+ public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
+ testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true);
+ }
+
+ @Test
+ public void testTextIOSinkUnsupportedInStreaming() throws Exception {
+ testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true);
+ }
+
+ @Test
+ public void testBatchViewAsSingletonToIsmRecord() throws Exception {
+ DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
+ IsmRecord<WindowedValue<String>>> doFnTester =
+ DoFnTester.of(
+ new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
+ <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
+
+ assertThat(
+ doFnTester.processBundle(
+ ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
+ 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
+ contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
+ }
+
+ @Test
+ public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
+ throws Exception {
+ DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
+ IsmRecord<WindowedValue<String>>> doFnTester =
+ DoFnTester.of(
+ new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
+ <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("found for singleton within window");
+ doFnTester.processBundle(ImmutableList.of(
+ KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0,
+ ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
+ KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
+ }
+
+ @Test
+ public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
+ DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester =
+ DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());
+
+ // The order of the output elements is important relative to processing order
+ assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains(
+ IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")),
+ IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")),
+ IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c"))));
+ }
+
+ @Test
+ public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception {
+ DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>,
+ IsmRecord<WindowedValue<Long>>> doFnTester =
+ DoFnTester.of(
+ new BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<Long, IntervalWindow>(
+ IntervalWindow.getCoder()));
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of(
+ KV.of(
+ windowA, WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
+ KV.of(
+ windowA, WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ KV.of(
+ windowA, WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
+ KV.of(
+ windowB, WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
+ KV.of(
+ windowB, WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING))
+ )),
+ KV.of(2, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of(
+ KV.of(
+ windowC, WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))
+ )));
+
+ // The order of the output elements is important relative to processing order
+ assertThat(doFnTester.processBundle(inputElements), contains(
+ IsmRecord.of(ImmutableList.of(windowA, 0L),
+ WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(ImmutableList.of(windowA, 1L),
+ WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(ImmutableList.of(windowA, 2L),
+ WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(ImmutableList.of(windowB, 0L),
+ WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
+ IsmRecord.of(ImmutableList.of(windowB, 1L),
+ WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)),
+ IsmRecord.of(ImmutableList.of(windowC, 0L),
+ WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))));
+ }
+
+ @Test
+ public void testToIsmRecordForMapLikeDoFn() throws Exception {
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
+
+ Coder<Long> keyCoder = VarLongCoder.of();
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
+ 1,
+ 2,
+ ImmutableList.<Coder<?>>of(
+ MetadataKeyCoder.of(keyCoder),
+ IntervalWindow.getCoder(),
+ BigEndianLongCoder.of()),
+ FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
+
+ DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
+ IsmRecord<WindowedValue<Long>>> doFnTester =
+ DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
+ outputForSizeTag,
+ outputForEntrySetTag,
+ windowCoder,
+ keyCoder,
+ ismCoder,
+ false /* unique keys */));
+ doFnTester.setSideOutputTags(TupleTagList.of(
+ ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer,
+ Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
+ KV.of(KV.of(1L, windowA),
+ WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
+ // same window same key as to previous
+ KV.of(KV.of(1L, windowA),
+ WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
+ // same window different key as to previous
+ KV.of(KV.of(2L, windowA),
+ WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ // different window same key as to previous
+ KV.of(KV.of(2L, windowB),
+ WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
+ // different window and different key as to previous
+ KV.of(KV.of(3L, windowB),
+ WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))),
+ KV.of(2, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
+ // different shard
+ KV.of(KV.of(4L, windowC),
+ WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))));
+
+ // The order of the output elements is important relative to processing order
+ assertThat(doFnTester.processBundle(inputElements), contains(
+ IsmRecord.of(
+ ImmutableList.of(1L, windowA, 0L),
+ WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(
+ ImmutableList.of(1L, windowA, 1L),
+ WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(
+ ImmutableList.of(2L, windowA, 0L),
+ WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ IsmRecord.of(
+ ImmutableList.of(2L, windowB, 0L),
+ WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
+ IsmRecord.of(
+ ImmutableList.of(3L, windowB, 0L),
+ WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
+ IsmRecord.of(
+ ImmutableList.of(4L, windowC, 0L),
+ WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))));
+
+ // Verify the number of unique keys per window.
+ assertThat(doFnTester.takeSideOutputElements(outputForSizeTag), contains(
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
+ KV.of(windowA, 2L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
+ KV.of(windowB, 2L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
+ KV.of(windowC, 1L))
+ ));
+
+ // Verify the output for the unique keys.
+ assertThat(doFnTester.takeSideOutputElements(outputForEntrySetTag), contains(
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
+ KV.of(windowA, 1L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
+ KV.of(windowA, 2L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
+ KV.of(windowB, 2L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
+ KV.of(windowB, 3L)),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
+ KV.of(windowC, 4L))
+ ));
+ }
+
+ @Test
+ public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception {
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
+
+ Coder<Long> keyCoder = VarLongCoder.of();
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
+ 1,
+ 2,
+ ImmutableList.<Coder<?>>of(
+ MetadataKeyCoder.of(keyCoder),
+ IntervalWindow.getCoder(),
+ BigEndianLongCoder.of()),
+ FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
+
+ DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
+ IsmRecord<WindowedValue<Long>>> doFnTester =
+ DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
+ outputForSizeTag,
+ outputForEntrySetTag,
+ windowCoder,
+ keyCoder,
+ ismCoder,
+ true /* unique keys */));
+ doFnTester.setSideOutputTags(TupleTagList.of(
+ ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+
+ Iterable<KV<Integer,
+ Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of(
+ KV.of(KV.of(1L, windowA),
+ WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
+ // same window same key as to previous
+ KV.of(KV.of(1L, windowA),
+ WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)))));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unique keys are expected but found key");
+ doFnTester.processBundle(inputElements);
+ }
+
+ @Test
+ public void testToIsmMetadataRecordForSizeDoFn() throws Exception {
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
+
+ Coder<Long> keyCoder = VarLongCoder.of();
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
+ 1,
+ 2,
+ ImmutableList.<Coder<?>>of(
+ MetadataKeyCoder.of(keyCoder),
+ IntervalWindow.getCoder(),
+ BigEndianLongCoder.of()),
+ FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
+
+ DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>,
+ IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
+ new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<Long, Long, IntervalWindow>(
+ windowCoder));
+ doFnTester.setSideOutputTags(TupleTagList.of(
+ ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1,
+ (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
+ KV.of(windowA, 2L),
+ KV.of(windowA, 3L),
+ KV.of(windowB, 7L))),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
+ (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
+ KV.of(windowC, 9L))));
+
+ // The order of the output elements is important relative to processing order
+ assertThat(doFnTester.processBundle(inputElements), contains(
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)),
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)),
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L))
+ ));
+ }
+
+ @Test
+ public void testToIsmMetadataRecordForKeyDoFn() throws Exception {
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
+ TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
+
+ Coder<Long> keyCoder = VarLongCoder.of();
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of(
+ 1,
+ 2,
+ ImmutableList.<Coder<?>>of(
+ MetadataKeyCoder.of(keyCoder),
+ IntervalWindow.getCoder(),
+ BigEndianLongCoder.of()),
+ FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
+
+ DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>,
+ IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
+ new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<Long, Long, IntervalWindow>(
+ keyCoder, windowCoder));
+ doFnTester.setSideOutputTags(TupleTagList.of(
+ ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1,
+ (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
+ KV.of(windowA, 2L),
+ // same window as previous
+ KV.of(windowA, 3L),
+ // different window as previous
+ KV.of(windowB, 3L))),
+ KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
+ (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(
+ KV.of(windowC, 3L))));
+
+ // The order of the output elements is important relative to processing order
+ assertThat(doFnTester.processBundle(inputElements), contains(
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)),
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
+ IsmRecord.<WindowedValue<Long>>meta(
+ ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L),
+ CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L))
+ ));
+ }
+
+ @Test
+ public void testToMapDoFn() throws Exception {
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
+ IsmRecord<WindowedValue<TransformedMap<Long,
+ WindowedValue<Long>,
+ Long>>>> doFnTester =
+ DoFnTester.of(new BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(windowCoder));
+
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer,
+ Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1,
+ (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
+ KV.of(windowA, WindowedValue.of(
+ KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ KV.of(windowA, WindowedValue.of(
+ KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
+ KV.of(windowB, WindowedValue.of(
+ KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
+ KV.of(windowB, WindowedValue.of(
+ KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))),
+ KV.of(2,
+ (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
+ KV.of(windowC, WindowedValue.of(
+ KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING)))));
+
+ // The order of the output elements is important relative to processing order
+ List<IsmRecord<WindowedValue<TransformedMap<Long,
+ WindowedValue<Long>,
+ Long>>>> output =
+ doFnTester.processBundle(inputElements);
+ assertEquals(3, output.size());
+ Map<Long, Long> outputMap;
+
+ outputMap = output.get(0).getValue().getValue();
+ assertEquals(2, outputMap.size());
+ assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap);
+
+ outputMap = output.get(1).getValue().getValue();
+ assertEquals(2, outputMap.size());
+ assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap);
+
+ outputMap = output.get(2).getValue().getValue();
+ assertEquals(1, outputMap.size());
+ assertEquals(ImmutableMap.of(4L, 41L), outputMap);
+ }
+
+ @Test
+ public void testToMultimapDoFn() throws Exception {
+ Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+ DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
+ IsmRecord<WindowedValue<TransformedMap<Long,
+ Iterable<WindowedValue<Long>>,
+ Iterable<Long>>>>> doFnTester =
+ DoFnTester.of(
+ new BatchViewAsMultimap.ToMultimapDoFn<Long, Long, IntervalWindow>(windowCoder));
+
+
+ IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
+ IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
+ IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
+
+ Iterable<KV<Integer,
+ Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements =
+ ImmutableList.of(
+ KV.of(1,
+ (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
+ KV.of(windowA, WindowedValue.of(
+ KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
+ KV.of(windowA, WindowedValue.of(
+ KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)),
+ KV.of(windowA, WindowedValue.of(
+ KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
+ KV.of(windowB, WindowedValue.of(
+ KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
+ KV.of(windowB, WindowedValue.of(
+ KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))),
+ KV.of(2,
+ (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of(
+ KV.of(windowC, WindowedValue.of(
+ KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING)))));
+
+ // The order of the output elements is important relative to processing order
+ List<IsmRecord<WindowedValue<TransformedMap<Long,
+ Iterable<WindowedValue<Long>>,
+ Iterable<Long>>>>> output =
+ doFnTester.processBundle(inputElements);
+ assertEquals(3, output.size());
+ Map<Long, Iterable<Long>> outputMap;
+
+ outputMap = output.get(0).getValue().getValue();
+ assertEquals(2, outputMap.size());
+ assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L));
+ assertThat(outputMap.get(2L), containsInAnyOrder(21L));
+
+ outputMap = output.get(1).getValue().getValue();
+ assertEquals(2, outputMap.size());
+ assertThat(outputMap.get(2L), containsInAnyOrder(21L));
+ assertThat(outputMap.get(3L), containsInAnyOrder(31L));
+
+ outputMap = output.get(2).getValue().getValue();
+ assertEquals(1, outputMap.size());
+ assertThat(outputMap.get(4L), containsInAnyOrder(41L));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
index 614affb..006daa9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
@@ -22,7 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -36,7 +36,7 @@ import org.junit.runners.JUnit4;
import java.util.Set;
/**
- * {@link DataflowPipelineRunner} specific tests for {@link AvroIO} transforms.
+ * {@link DataflowRunner} specific tests for {@link AvroIO} transforms.
*/
@RunWith(JUnit4.class)
public class DataflowAvroIOTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
index 3df9cdb..27bc2d9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
@@ -22,7 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
import java.util.Set;
/**
- * {@link DataflowPipelineRunner} specific tests for {@link PubsubIO} transforms.
+ * {@link DataflowRunner} specific tests for {@link PubsubIO} transforms.
*/
@RunWith(JUnit4.class)
public class DataflowPubsubIOTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index 0340435..727ffdc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
import org.apache.beam.sdk.io.TextIO;
@@ -39,7 +39,7 @@ import org.junit.runners.JUnit4;
import java.util.Set;
/**
- * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
+ * {@link DataflowRunner} specific tests for TextIO Read and Write transforms.
*/
@RunWith(JUnit4.class)
public class DataflowTextIOTest {