You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/27 03:08:35 UTC
[03/21] incubator-beam git commit: Reorganize Java packages in the
sources of the Google Cloud Dataflow runner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
deleted file mode 100644
index 27c0acc..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ /dev/null
@@ -1,965 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getString;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.OutputReference;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.Step;
-import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for DataflowPipelineTranslator.
- */
-@RunWith(JUnit4.class)
-public class DataflowPipelineTranslatorTest implements Serializable {
-
- @Rule public transient ExpectedException thrown = ExpectedException.none();
-
- // A Custom Mockito matcher for an initial Job that checks that all
- // expected fields are set.
- private static class IsValidCreateRequest extends ArgumentMatcher<Job> {
- @Override
- public boolean matches(Object o) {
- Job job = (Job) o;
- return job.getId() == null
- && job.getProjectId() == null
- && job.getName() != null
- && job.getType() != null
- && job.getEnvironment() != null
- && job.getSteps() != null
- && job.getCurrentState() == null
- && job.getCurrentStateTime() == null
- && job.getExecutionInfo() == null
- && job.getCreateTime() == null;
- }
- }
-
- private Pipeline buildPipeline(DataflowPipelineOptions options) {
- options.setRunner(DataflowPipelineRunner.class);
- Pipeline p = Pipeline.create(options);
-
- p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
- .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
-
- return p;
- }
-
- private static Dataflow buildMockDataflow(
- ArgumentMatcher<Job> jobMatcher) throws IOException {
- Dataflow mockDataflowClient = mock(Dataflow.class);
- Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
- Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
- Dataflow.Projects.Jobs.Create mockRequest = mock(
- Dataflow.Projects.Jobs.Create.class);
-
- when(mockDataflowClient.projects()).thenReturn(mockProjects);
- when(mockProjects.jobs()).thenReturn(mockJobs);
- when(mockJobs.create(eq("someProject"), argThat(jobMatcher)))
- .thenReturn(mockRequest);
-
- Job resultJob = new Job();
- resultJob.setId("newid");
- when(mockRequest.execute()).thenReturn(resultJob);
- return mockDataflowClient;
- }
-
- private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
- GcsUtil mockGcsUtil = mock(GcsUtil.class);
- when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
- @Override
- public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
- return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
- }
- });
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
- when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
-
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
- options.setGcpCredential(new TestCredential());
- options.setJobName("some-job-name");
- options.setProject("some-project");
- options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
- options.setFilesToStage(new LinkedList<String>());
- options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
- options.setGcsUtil(mockGcsUtil);
- return options;
- }
-
- @Test
- public void testSettingOfSdkPipelineOptions() throws IOException {
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setRunner(DataflowPipelineRunner.class);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- // Note that the contents of this materialized map may be changed by the act of reading an
- // option, which will cause the default to get materialized whereas it would otherwise be
- // left absent. It is permissible to simply alter this test to reflect current behavior.
- Map<String, Object> settings = new HashMap<>();
- settings.put("appName", "DataflowPipelineTranslatorTest");
- settings.put("project", "some-project");
- settings.put("pathValidatorClass", "org.apache.beam.sdk.util.DataflowPathValidator");
- settings.put("runner", "org.apache.beam.sdk.runners.DataflowPipelineRunner");
- settings.put("jobName", "some-job-name");
- settings.put("tempLocation", "gs://somebucket/some/path");
- settings.put("stagingLocation", "gs://somebucket/some/path/staging");
- settings.put("stableUniqueNames", "WARNING");
- settings.put("streaming", false);
- settings.put("numberOfWorkerHarnessThreads", 0);
- settings.put("experiments", null);
-
- Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
- assertThat(sdkPipelineOptions, hasKey("options"));
- assertEquals(settings, sdkPipelineOptions.get("options"));
- }
-
- @Test
- public void testNetworkConfig() throws IOException {
- final String testNetwork = "test-network";
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setNetwork(testNetwork);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(testNetwork,
- job.getEnvironment().getWorkerPools().get(0).getNetwork());
- }
-
- @Test
- public void testNetworkConfigMissing() throws IOException {
- DataflowPipelineOptions options = buildPipelineOptions();
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
- }
-
- @Test
- public void testSubnetworkConfig() throws IOException {
- final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setSubnetwork(testSubnetwork);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(testSubnetwork,
- job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
- }
-
- @Test
- public void testSubnetworkConfigMissing() throws IOException {
- DataflowPipelineOptions options = buildPipelineOptions();
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
- }
-
- @Test
- public void testScalingAlgorithmMissing() throws IOException {
- DataflowPipelineOptions options = buildPipelineOptions();
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- // Autoscaling settings are always set.
- assertNull(
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getAlgorithm());
- assertEquals(
- 0,
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getMaxNumWorkers()
- .intValue());
- }
-
- @Test
- public void testScalingAlgorithmNone() throws IOException {
- final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling =
- DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE;
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setAutoscalingAlgorithm(noScaling);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(
- "AUTOSCALING_ALGORITHM_NONE",
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getAlgorithm());
- assertEquals(
- 0,
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getMaxNumWorkers()
- .intValue());
- }
-
- @Test
- public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
- final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null;
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setMaxNumWorkers(42);
- options.setAutoscalingAlgorithm(noScaling);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertNull(
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getAlgorithm());
- assertEquals(
- 42,
- job
- .getEnvironment()
- .getWorkerPools()
- .get(0)
- .getAutoscalingSettings()
- .getMaxNumWorkers()
- .intValue());
- }
-
- @Test
- public void testZoneConfig() throws IOException {
- final String testZone = "test-zone-1";
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setZone(testZone);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(testZone,
- job.getEnvironment().getWorkerPools().get(0).getZone());
- }
-
- @Test
- public void testWorkerMachineTypeConfig() throws IOException {
- final String testMachineType = "test-machine-type";
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setWorkerMachineType(testMachineType);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
-
- WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0);
- assertEquals(testMachineType, workerPool.getMachineType());
- }
-
- @Test
- public void testDiskSizeGbConfig() throws IOException {
- final Integer diskSizeGb = 1234;
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setDiskSizeGb(diskSizeGb);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(diskSizeGb,
- job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
- }
-
- @Test
- public void testPredefinedAddStep() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
-
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipelineTranslator.registerTransformTranslator(
- EmbeddedTransform.class, new EmbeddedTranslator());
-
- // Create a predefined step using another pipeline
- Step predefinedStep = createPredefinedStep();
-
- // Create a pipeline that the predefined step will be embedded into
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
- .apply(ParDo.of(new NoOpFn()))
- .apply(new EmbeddedTransform(predefinedStep.clone()))
- .apply(ParDo.of(new NoOpFn()));
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(4, steps.size());
-
- // The input to the embedded step should match the output of the step before
- Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
- Map<String, Object> step2In = getDictionary(
- steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
- assertEquals(step1Out, step2In);
-
- // The output from the embedded step should match the input of the step after
- Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
- Map<String, Object> step3In = getDictionary(
- steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
- assertEquals(step2Out, step3In);
-
- // The step should not have been modified other than remapping the input
- Step predefinedStepClone = predefinedStep.clone();
- Step embeddedStepClone = steps.get(2).clone();
- predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
- embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
- assertEquals(predefinedStepClone, embeddedStepClone);
- }
-
- /**
- * Construct a OutputReference for the output of the step.
- */
- private static OutputReference getOutputPortReference(Step step) throws Exception {
- // TODO: This should be done via a Structs accessor.
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> output =
- (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
- String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME);
- return new OutputReference(step.getName(), outputTagId);
- }
-
- /**
- * Returns a Step for a DoFn by creating and translating a pipeline.
- */
- private static Step createPredefinedStep() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- Pipeline pipeline = Pipeline.create(options);
- String stepName = "DoFn1";
- pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
- .apply(ParDo.of(new NoOpFn()).named(stepName))
- .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- assertEquals(13, job.getSteps().size());
- Step step = job.getSteps().get(1);
- assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
- return step;
- }
-
- private static class NoOpFn extends DoFn<String, String> {
- @Override public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- }
-
- /**
- * A placeholder transform that will be used to substitute a predefined Step.
- */
- private static class EmbeddedTransform
- extends PTransform<PCollection<String>, PCollection<String>> {
- private final Step step;
-
- public EmbeddedTransform(Step step) {
- this.step = step;
- }
-
- @Override
- public PCollection<String> apply(PCollection<String> input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- input.isBounded());
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
- }
-
- /**
- * A TransformTranslator that adds the predefined Step using
- * {@link TranslationContext#addStep} and remaps the input port reference.
- */
- private static class EmbeddedTranslator
- implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
- @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
- addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
- context.asOutputReference(context.getInput(transform)));
- context.addStep(transform, transform.step);
- }
- }
-
- /**
- * A composite transform that returns an output that is unrelated to
- * the input.
- */
- private static class UnrelatedOutputCreator
- extends PTransform<PCollection<Integer>, PCollection<Integer>> {
-
- @Override
- public PCollection<Integer> apply(PCollection<Integer> input) {
- // Apply an operation so that this is a composite transform.
- input.apply(Count.<Integer>perElement());
-
- // Return a value unrelated to the input.
- return input.getPipeline().apply(Create.of(1, 2, 3, 4));
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return VarIntCoder.of();
- }
- }
-
- /**
- * A composite transform that returns an output that is unbound.
- */
- private static class UnboundOutputCreator
- extends PTransform<PCollection<Integer>, PDone> {
-
- @Override
- public PDone apply(PCollection<Integer> input) {
- // Apply an operation so that this is a composite transform.
- input.apply(Count.<Integer>perElement());
-
- return PDone.in(input.getPipeline());
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
- }
-
- /**
- * A composite transform that returns a partially bound output.
- *
- * <p>This is not allowed and will result in a failure.
- */
- private static class PartiallyBoundOutputCreator
- extends PTransform<PCollection<Integer>, PCollectionTuple> {
-
- public final TupleTag<Integer> sumTag = new TupleTag<>("sum");
- public final TupleTag<Void> doneTag = new TupleTag<>("done");
-
- @Override
- public PCollectionTuple apply(PCollection<Integer> input) {
- PCollection<Integer> sum = input.apply(Sum.integersGlobally());
-
- // Fails here when attempting to construct a tuple with an unbound object.
- return PCollectionTuple.of(sumTag, sum)
- .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- input.isBounded()));
- }
- }
-
- @Test
- public void testMultiGraphPipelineSerialization() throws IOException {
- Pipeline p = Pipeline.create(buildPipelineOptions());
-
- PCollection<Integer> input = p.begin()
- .apply(Create.of(1, 2, 3));
-
- input.apply(new UnrelatedOutputCreator());
- input.apply(new UnboundOutputCreator());
-
- DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(
- PipelineOptionsFactory.as(DataflowPipelineOptions.class));
-
- // Check that translation doesn't fail.
- t.translate(
- p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
- }
-
- @Test
- public void testPartiallyBoundFailure() throws IOException {
- Pipeline p = Pipeline.create(buildPipelineOptions());
-
- PCollection<Integer> input = p.begin()
- .apply(Create.of(1, 2, 3));
-
- thrown.expect(IllegalStateException.class);
- input.apply(new PartiallyBoundOutputCreator());
-
- Assert.fail("Failure expected from use of partially bound output");
- }
-
- /**
- * This tests a few corner cases that should not crash.
- */
- @Test
- public void testGoodWildcards() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- Pipeline pipeline = Pipeline.create(options);
- DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
-
- applyRead(pipeline, "gs://bucket/foo");
- applyRead(pipeline, "gs://bucket/foo/");
- applyRead(pipeline, "gs://bucket/foo/*");
- applyRead(pipeline, "gs://bucket/foo/?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]");
- applyRead(pipeline, "gs://bucket/foo/*baz*");
- applyRead(pipeline, "gs://bucket/foo/*baz?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
- applyRead(pipeline, "gs://bucket/foo/baz/*");
- applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
- applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
- applyRead(pipeline, "gs://bucket/foo*/baz");
- applyRead(pipeline, "gs://bucket/foo?/baz");
- applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
- // Check that translation doesn't fail.
- t.translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList());
- }
-
- private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
- }
-
- /**
- * Recursive wildcards are not supported.
- * This tests "**".
- */
- @Test
- public void testBadWildcardRecursive() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- Pipeline pipeline = Pipeline.create(options);
- DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
-
- pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
-
- // Check that translation does fail.
- thrown.expectCause(Matchers.allOf(
- instanceOf(IllegalArgumentException.class),
- ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
- t.translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList());
- }
-
- @Test
- public void testToSingletonTranslation() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<T> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setExperiments(ImmutableList.of("disable_ism_side_input"));
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1))
- .apply(View.<Integer>asSingleton());
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(2, steps.size());
-
- Step createStep = steps.get(0);
- assertEquals("ParallelRead", createStep.getKind());
-
- Step collectionToSingletonStep = steps.get(1);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-
- }
-
- @Test
- public void testToIterableTranslation() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<Iterable<T>> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setExperiments(ImmutableList.of("disable_ism_side_input"));
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1, 2, 3))
- .apply(View.<Integer>asIterable());
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(2, steps.size());
-
- Step createStep = steps.get(0);
- assertEquals("ParallelRead", createStep.getKind());
-
- Step collectionToSingletonStep = steps.get(1);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
- }
-
- @Test
- public void testToSingletonTranslationWithIsmSideInput() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<T> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1))
- .apply(View.<Integer>asSingleton());
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(5, steps.size());
-
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
- assertTrue(
- Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
-
- Step collectionToSingletonStep = steps.get(4);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
- }
-
- @Test
- public void testToIterableTranslationWithIsmSideInput() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<Iterable<T>> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1, 2, 3))
- .apply(View.<Integer>asIterable());
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(3, steps.size());
-
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
- assertTrue(
- Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
-
-
- Step collectionToSingletonStep = steps.get(2);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
- }
-
- @Test
- public void testStepDisplayData() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- Pipeline pipeline = Pipeline.create(options);
-
- DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder
- .add("foo", "bar")
- .add("foo2", DataflowPipelineTranslatorTest.class)
- .withLabel("Test Class")
- .withLinkUrl("http://www.google.com");
- }
- };
-
- DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.add("foo3", 1234);
- }
- };
-
- ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
- ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
- pipeline
- .apply(Create.of(1, 2, 3))
- .apply(parDo1)
- .apply(parDo2);
-
- Job job =
- translator
- .translate(
- pipeline,
- (DataflowPipelineRunner) pipeline.getRunner(),
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- List<Step> steps = job.getSteps();
- assertEquals(3, steps.size());
-
- Map<String, Object> parDo1Properties = steps.get(1).getProperties();
- Map<String, Object> parDo2Properties = steps.get(2).getProperties();
- assertThat(parDo1Properties, hasKey("display_data"));
-
- Collection<Map<String, String>> fn1displayData =
- (Collection<Map<String, String>>) parDo1Properties.get("display_data");
- Collection<Map<String, String>> fn2displayData =
- (Collection<Map<String, String>>) parDo2Properties.get("display_data");
-
- ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of(
- ImmutableMap.<String, Object>builder()
- .put("key", "foo")
- .put("type", "STRING")
- .put("value", "bar")
- .put("namespace", fn1.getClass().getName())
- .build(),
- ImmutableMap.<String, Object>builder()
- .put("key", "fn")
- .put("type", "JAVA_CLASS")
- .put("value", fn1.getClass().getName())
- .put("shortValue", fn1.getClass().getSimpleName())
- .put("namespace", parDo1.getClass().getName())
- .build(),
- ImmutableMap.<String, Object>builder()
- .put("key", "foo2")
- .put("type", "JAVA_CLASS")
- .put("value", DataflowPipelineTranslatorTest.class.getName())
- .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
- .put("namespace", fn1.getClass().getName())
- .put("label", "Test Class")
- .put("linkUrl", "http://www.google.com")
- .build()
- );
-
- ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
- ImmutableMap.<String, Object>builder()
- .put("key", "fn")
- .put("type", "JAVA_CLASS")
- .put("value", fn2.getClass().getName())
- .put("shortValue", fn2.getClass().getSimpleName())
- .put("namespace", parDo2.getClass().getName())
- .build(),
- ImmutableMap.<String, Object>builder()
- .put("key", "foo3")
- .put("type", "INTEGER")
- .put("value", 1234L)
- .put("namespace", fn2.getClass().getName())
- .build()
- );
-
- assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
- assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
deleted file mode 100644
index 2acede3..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/dataflow/CustomSourcesTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.dataflow;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Sample;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for {@link CustomSources}.
- */
-@RunWith(JUnit4.class)
-public class CustomSourcesTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class);
-
- static class TestIO {
- public static Read fromRange(int from, int to) {
- return new Read(from, to, false);
- }
-
- static class Read extends BoundedSource<Integer> {
- final int from;
- final int to;
- final boolean produceTimestamps;
-
- Read(int from, int to, boolean produceTimestamps) {
- this.from = from;
- this.to = to;
- this.produceTimestamps = produceTimestamps;
- }
-
- public Read withTimestampsMillis() {
- return new Read(from, to, true);
- }
-
- @Override
- public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
- throws Exception {
- List<Read> res = new ArrayList<>();
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers();
- for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) {
- res.add(new Read(
- Math.round(from + i * step), Math.round(from + (i + 1) * step),
- produceTimestamps));
- }
- return res;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 8 * (to - from);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
- return new RangeReader(this);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public String toString() {
- return "[" + from + ", " + to + ")";
- }
-
- @Override
- public Coder<Integer> getDefaultOutputCoder() {
- return BigEndianIntegerCoder.of();
- }
-
- private static class RangeReader extends BoundedReader<Integer> {
- // To verify that BasicSerializableSourceFormat calls our methods according to protocol.
- enum State {
- UNSTARTED,
- STARTED,
- FINISHED
- }
- private Read source;
- private int current = -1;
- private State state = State.UNSTARTED;
-
- public RangeReader(Read source) {
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- Preconditions.checkState(state == State.UNSTARTED);
- state = State.STARTED;
- current = source.from;
- return (current < source.to);
- }
-
- @Override
- public boolean advance() throws IOException {
- Preconditions.checkState(state == State.STARTED);
- if (current == source.to - 1) {
- state = State.FINISHED;
- return false;
- }
- current++;
- return true;
- }
-
- @Override
- public Integer getCurrent() {
- Preconditions.checkState(state == State.STARTED);
- return current;
- }
-
- @Override
- public Instant getCurrentTimestamp() {
- return source.produceTimestamps
- ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- @Override
- public void close() throws IOException {
- Preconditions.checkState(state == State.STARTED || state == State.FINISHED);
- state = State.FINISHED;
- }
-
- @Override
- public Read getCurrentSource() {
- return source;
- }
-
- @Override
- public Read splitAtFraction(double fraction) {
- int proposedIndex = (int) (source.from + fraction * (source.to - source.from));
- if (proposedIndex <= current) {
- return null;
- }
- Read primary = new Read(source.from, proposedIndex, source.produceTimestamps);
- Read residual = new Read(proposedIndex, source.to, source.produceTimestamps);
- this.source = primary;
- return residual;
- }
-
- @Override
- public Double getFractionConsumed() {
- return (current == -1)
- ? 0.0
- : (1.0 * (1 + current - source.from) / (source.to - source.from));
- }
- }
- }
- }
-
- @Test
- public void testDirectPipelineWithoutTimestamps() throws Exception {
- Pipeline p = TestPipeline.create();
- PCollection<Integer> sum = p
- .apply(Read.from(TestIO.fromRange(10, 20)))
- .apply(Sum.integersGlobally())
- .apply(Sample.<Integer>any(1));
-
- PAssert.thatSingleton(sum).isEqualTo(145);
- p.run();
- }
-
- @Test
- public void testDirectPipelineWithTimestamps() throws Exception {
- Pipeline p = TestPipeline.create();
- PCollection<Integer> sums =
- p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis()))
- .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3))))
- .apply(Sum.integersGlobally().withoutDefaults());
- // Should group into [10 11] [12 13 14] [15 16 17] [18 19].
- PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48);
- p.run();
- }
-
- @Test
- public void testRangeProgressAndSplitAtFraction() throws Exception {
- // Show basic usage of getFractionConsumed and splitAtFraction.
- // This test only tests TestIO itself, not BasicSerializableSourceFormat.
-
- DataflowPipelineOptions options =
- PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
- TestIO.Read source = TestIO.fromRange(10, 20);
- try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
- assertEquals(0, reader.getFractionConsumed().intValue());
- assertTrue(reader.start());
- assertEquals(0.1, reader.getFractionConsumed(), 1e-6);
- assertTrue(reader.advance());
- assertEquals(0.2, reader.getFractionConsumed(), 1e-6);
- // Already past 0.0 and 0.1.
- assertNull(reader.splitAtFraction(0.0));
- assertNull(reader.splitAtFraction(0.1));
-
- {
- TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5);
- assertNotNull(residual);
- TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
- assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14));
- assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19));
- }
-
- // Range is now [10, 15) and we are at 12.
- {
- TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14.
- assertNotNull(residual);
- TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
- assertThat(readFromSource(primary, options), contains(10, 11, 12, 13));
- assertThat(readFromSource(residual, options), contains(14));
- }
-
- assertTrue(reader.advance());
- assertEquals(12, reader.getCurrent().intValue());
- assertTrue(reader.advance());
- assertEquals(13, reader.getCurrent().intValue());
- assertFalse(reader.advance());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
deleted file mode 100644
index 185ab51..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineJob;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.MonitoringUtil;
-import org.apache.beam.sdk.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.TimeUtil;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.Json;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-/** Tests for {@link TestDataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowPipelineRunnerTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Mock private MockHttpTransport transport;
- @Mock private MockLowLevelHttpRequest request;
- @Mock private GcsUtil mockGcsUtil;
-
- private TestDataflowPipelineOptions options;
- private Dataflow service;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
- doCallRealMethod().when(request).getContentAsString();
- service = new Dataflow(transport, Transport.getJsonFactory(), null);
-
- options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setAppName("TestAppName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setTempRoot("gs://test");
- options.setGcpCredential(new TestCredential());
- options.setDataflowClient(service);
- options.setRunner(TestDataflowPipelineRunner.class);
- options.setPathValidatorClass(NoopPathValidator.class);
- }
-
- @Test
- public void testToString() {
- assertEquals("TestDataflowPipelineRunner#TestAppName",
- new TestDataflowPipelineRunner(options).toString());
- }
-
- @Test
- public void testRunBatchJobThatSucceeds() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- assertEquals(mockJob, runner.run(p, mockRunner));
- }
-
- @Test
- public void testRunBatchJobThatFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testBatchPipelineFailsIfException() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, atLeastOnce()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testRunStreamingJobThatSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testRunStreamingJobThatFails() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- doReturn(State.DONE).when(job).getState();
- assertEquals(Optional.of(true), runner.checkForSuccess(job));
- }
-
- @Test
- public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- doReturn(State.DONE).when(job).getState();
- assertEquals(Optional.of(false), runner.checkForSuccess(job));
- }
-
- @Test
- public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, false /* tentative */));
- doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.absent(), runner.checkForSuccess(job));
- }
-
- private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
- throws Exception {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(success ? "PAssertSuccess" : "PAssertFailure");
- name.setContext(
- tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(BigDecimal.ONE);
-
- MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
- response.setContentType(Json.MEDIA_TYPE);
- JobMetrics jobMetrics = new JobMetrics();
- jobMetrics.setMetrics(Lists.newArrayList(metric));
- // N.B. Setting the factory is necessary in order to get valid JSON.
- jobMetrics.setFactory(Transport.getJsonFactory());
- response.setContent(jobMetrics.toPrettyString());
- return response;
- }
-
- @Test
- public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, false /* tentative */));
- doReturn(State.FAILED).when(job).getState();
- assertEquals(Optional.of(false), runner.checkForSuccess(job));
- }
-
- @Test
- public void testStreamingPipelineFailsIfException() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, atLeastOnce()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
deleted file mode 100644
index a381f68..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowGroupByKeyTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class DataflowGroupByKeyTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- /**
- * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
- * is not expanded. This is used for verifying that even without expansion the proper errors show
- * up.
- */
- private Pipeline createTestServiceRunner() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
- options.setProject("someproject");
- options.setStagingLocation("gs://staging");
- options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
- return Pipeline.create(options);
- }
-
- @Test
- public void testInvalidWindowsService() {
- Pipeline p = createTestServiceRunner();
-
- List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
-
- PCollection<KV<String, Integer>> input =
- p.apply(Create.of(ungroupedPairs)
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
- .apply(Window.<KV<String, Integer>>into(
- Sessions.withGapDuration(Duration.standardMinutes(1))));
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("GroupByKey must have a valid Window merge function");
- input
- .apply("GroupByKey", GroupByKey.<String, Integer>create())
- .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
- }
-
- @Test
- public void testGroupByKeyServiceUnbounded() {
- Pipeline p = createTestServiceRunner();
-
- PCollection<KV<String, Integer>> input =
- p.apply(
- new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
- @Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
- }
- });
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
- + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
-
- input.apply("GroupByKey", GroupByKey.<String, Integer>create());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
deleted file mode 100644
index b86de7e..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/transforms/DataflowViewTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class DataflowViewTest {
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
-
- private Pipeline createTestBatchRunner() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
- options.setProject("someproject");
- options.setStagingLocation("gs://staging");
- options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
- return Pipeline.create(options);
- }
-
- private Pipeline createTestStreamingRunner() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
- options.setStreaming(true);
- options.setProject("someproject");
- options.setStagingLocation("gs://staging");
- options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
- return Pipeline.create(options);
- }
-
- private void testViewUnbounded(
- Pipeline pipeline,
- PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Unable to create a side-input view from input");
- thrown.expectCause(
- ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
- pipeline
- .apply(
- new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
- @Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
- }
- })
- .apply(view);
- }
-
- private void testViewNonmerging(
- Pipeline pipeline,
- PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Unable to create a side-input view from input");
- thrown.expectCause(
- ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
- pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
- .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
- "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
- .apply(view);
- }
-
- @Test
- public void testViewUnboundedAsSingletonBatch() {
- testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
- }
-
- @Test
- public void testViewUnboundedAsSingletonStreaming() {
- testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
- }
-
- @Test
- public void testViewUnboundedAsIterableBatch() {
- testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
- }
-
- @Test
- public void testViewUnboundedAsIterableStreaming() {
- testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
- }
-
- @Test
- public void testViewUnboundedAsListBatch() {
- testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
- }
-
- @Test
- public void testViewUnboundedAsListStreaming() {
- testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
- }
-
- @Test
- public void testViewUnboundedAsMapBatch() {
- testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
- }
-
- @Test
- public void testViewUnboundedAsMapStreaming() {
- testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
- }
-
- @Test
- public void testViewUnboundedAsMultimapBatch() {
- testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
- }
-
- @Test
- public void testViewUnboundedAsMultimapStreaming() {
- testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
- }
-
- @Test
- public void testViewNonmergingAsSingletonBatch() {
- testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
- }
-
- @Test
- public void testViewNonmergingAsSingletonStreaming() {
- testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
- }
-
- @Test
- public void testViewNonmergingAsIterableBatch() {
- testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
- }
-
- @Test
- public void testViewNonmergingAsIterableStreaming() {
- testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
- }
-
- @Test
- public void testViewNonmergingAsListBatch() {
- testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
- }
-
- @Test
- public void testViewNonmergingAsListStreaming() {
- testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
- }
-
- @Test
- public void testViewNonmergingAsMapBatch() {
- testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
- }
-
- @Test
- public void testViewNonmergingAsMapStreaming() {
- testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
- }
-
- @Test
- public void testViewNonmergingAsMultimapBatch() {
- testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
- }
-
- @Test
- public void testViewNonmergingAsMultimapStreaming() {
- testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
deleted file mode 100644
index b459c47..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Mock private GcsUtil mockGcsUtil;
- private DataflowPathValidator validator;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
- when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setGcpCredential(new TestCredential());
- options.setRunner(DataflowPipelineRunner.class);
- options.setGcsUtil(mockGcsUtil);
- validator = new DataflowPathValidator(options);
- }
-
- @Test
- public void testValidFilePattern() {
- validator.validateInputFilePatternSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidFilePattern() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateInputFilePatternSupported("/local/path");
- }
-
- @Test
- public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Could not find file gs://non-existent-bucket/location");
- validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
- }
-
- @Test
- public void testValidOutputPrefix() {
- validator.validateOutputFilePrefixSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidOutputPrefix() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateOutputFilePrefixSupported("/local/path");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
deleted file mode 100644
index bdc0bc3..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/MonitoringUtilTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for MonitoringUtil.
- */
-@RunWith(JUnit4.class)
-public class MonitoringUtilTest {
- private static final String PROJECT_ID = "someProject";
- private static final String JOB_ID = "1234";
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testGetJobMessages() throws IOException {
- Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
-
- // Two requests are needed to get all the messages.
- Dataflow.Projects.Jobs.Messages.List firstRequest =
- mock(Dataflow.Projects.Jobs.Messages.List.class);
- Dataflow.Projects.Jobs.Messages.List secondRequest =
- mock(Dataflow.Projects.Jobs.Messages.List.class);
-
- when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
-
- ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
- firstResponse.setJobMessages(new ArrayList<JobMessage>());
- for (int i = 0; i < 100; ++i) {
- JobMessage message = new JobMessage();
- message.setId("message_" + i);
- message.setTime(TimeUtil.toCloudTime(new Instant(i)));
- firstResponse.getJobMessages().add(message);
- }
- String pageToken = "page_token";
- firstResponse.setNextPageToken(pageToken);
-
- ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
- secondResponse.setJobMessages(new ArrayList<JobMessage>());
- for (int i = 100; i < 150; ++i) {
- JobMessage message = new JobMessage();
- message.setId("message_" + i);
- message.setTime(TimeUtil.toCloudTime(new Instant(i)));
- secondResponse.getJobMessages().add(message);
- }
-
- when(firstRequest.execute()).thenReturn(firstResponse);
- when(secondRequest.execute()).thenReturn(secondResponse);
-
- MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
-
- List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
-
- verify(secondRequest).setPageToken(pageToken);
-
- assertEquals(150, messages.size());
- }
-
- @Test
- public void testToStateCreatesState() {
- String stateName = "JOB_STATE_DONE";
-
- State result = MonitoringUtil.toState(stateName);
-
- assertEquals(State.DONE, result);
- }
-
- @Test
- public void testToStateWithNullReturnsUnknown() {
- String stateName = null;
-
- State result = MonitoringUtil.toState(stateName);
-
- assertEquals(State.UNKNOWN, result);
- }
-
- @Test
- public void testToStateWithOtherValueReturnsUnknown() {
- String stateName = "FOO_BAR_BAZ";
-
- State result = MonitoringUtil.toState(stateName);
-
- assertEquals(State.UNKNOWN, result);
- }
-
- @Test
- public void testDontOverrideEndpointWithDefaultApi() {
- DataflowPipelineOptions options =
- PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
- options.setProject(PROJECT_ID);
- options.setGcpCredential(new TestCredential());
- String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
- assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
- }
-
- @Test
- public void testOverridesEndpointWithStagedDataflowEndpoint() {
- DataflowPipelineOptions options =
- PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
- options.setProject(PROJECT_ID);
- options.setGcpCredential(new TestCredential());
- String stagingDataflowEndpoint = "v0neverExisted";
- options.setDataflowEndpoint(stagingDataflowEndpoint);
- String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
- assertEquals(
- "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
- + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
- cancelCommand);
- }
-}