You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:16 UTC
[7/9] beam git commit: Rename PTransforms to PTransformTranslation
Rename PTransforms to PTransformTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6728e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6728e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6728e2
Branch: refs/heads/master
Commit: 9b6728e24748791b7181b20183df3ada31f45682
Parents: 940819e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:28:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformTranslation.java | 119 ++++++++++++
.../runners/core/construction/PTransforms.java | 119 ------------
.../beam/runners/core/construction/ParDos.java | 4 +-
.../core/construction/SdkComponents.java | 3 +-
.../TransformPayloadTranslatorRegistrar.java | 2 +-
.../core/construction/WindowIntoTranslator.java | 2 +-
.../construction/PTransformTranslationTest.java | 189 +++++++++++++++++++
.../core/construction/PTransformsTest.java | 188 ------------------
8 files changed, 314 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
new file mode 100644
index 0000000..86638de
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API
+ * protocol buffers}.
+ */
+public class PTransformTranslation {
+ private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
+ KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+
+ private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
+ loadTransformPayloadTranslators() {
+ ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator> builder =
+ ImmutableMap.builder();
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ builder.putAll(registrar.getTransformPayloadTranslators());
+ }
+ return builder.build();
+ }
+
+ private PTransformTranslation() {}
+
+ /**
+ * Translates an {@link AppliedPTransform} into a runner API proto.
+ *
+ * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
+ */
+ static RunnerApi.PTransform toProto(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
+ for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
+ checkArgument(
+ taggedInput.getValue() instanceof PCollection,
+ "Unexpected input type %s",
+ taggedInput.getValue().getClass());
+ transformBuilder.putInputs(
+ toProto(taggedInput.getKey()),
+ components.registerPCollection((PCollection<?>) taggedInput.getValue()));
+ }
+ for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
+ // TODO: Remove gating
+ if (taggedOutput.getValue() instanceof PCollection) {
+ checkArgument(
+ taggedOutput.getValue() instanceof PCollection,
+ "Unexpected output type %s",
+ taggedOutput.getValue().getClass());
+ transformBuilder.putOutputs(
+ toProto(taggedOutput.getKey()),
+ components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+ }
+ }
+ for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
+ transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
+ }
+
+ transformBuilder.setUniqueName(appliedPTransform.getFullName());
+ // TODO: Display Data
+
+ PTransform<?, ?> transform = appliedPTransform.getTransform();
+ if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
+ FunctionSpec payload =
+ KNOWN_PAYLOAD_TRANSLATORS
+ .get(transform.getClass())
+ .translate(appliedPTransform, components);
+ transformBuilder.setSpec(payload);
+ }
+
+ return transformBuilder.build();
+ }
+
+ private static String toProto(TupleTag<?> tag) {
+ return tag.getId();
+ }
+
+ /**
+ * A translator consumes a {@link PTransform} application and produces the appropriate
+ * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
+ */
+ public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
+ FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
deleted file mode 100644
index 9826b77..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API
- * protocol buffers}.
- */
-public class PTransforms {
- private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
- KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
-
- private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
- loadTransformPayloadTranslators() {
- ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator> builder =
- ImmutableMap.builder();
- for (TransformPayloadTranslatorRegistrar registrar :
- ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
- builder.putAll(registrar.getTransformPayloadTranslators());
- }
- return builder.build();
- }
-
- private PTransforms() {}
-
- /**
- * Translates an {@link AppliedPTransform} into a runner API proto.
- *
- * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
- */
- static RunnerApi.PTransform toProto(
- AppliedPTransform<?, ?, ?> appliedPTransform,
- List<AppliedPTransform<?, ?, ?>> subtransforms,
- SdkComponents components)
- throws IOException {
- RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
- for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
- checkArgument(
- taggedInput.getValue() instanceof PCollection,
- "Unexpected input type %s",
- taggedInput.getValue().getClass());
- transformBuilder.putInputs(
- toProto(taggedInput.getKey()),
- components.registerPCollection((PCollection<?>) taggedInput.getValue()));
- }
- for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
- // TODO: Remove gating
- if (taggedOutput.getValue() instanceof PCollection) {
- checkArgument(
- taggedOutput.getValue() instanceof PCollection,
- "Unexpected output type %s",
- taggedOutput.getValue().getClass());
- transformBuilder.putOutputs(
- toProto(taggedOutput.getKey()),
- components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
- }
- }
- for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
- transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
- }
-
- transformBuilder.setUniqueName(appliedPTransform.getFullName());
- // TODO: Display Data
-
- PTransform<?, ?> transform = appliedPTransform.getTransform();
- if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
- FunctionSpec payload =
- KNOWN_PAYLOAD_TRANSLATORS
- .get(transform.getClass())
- .translate(appliedPTransform, components);
- transformBuilder.setSpec(payload);
- }
-
- return transformBuilder.build();
- }
-
- private static String toProto(TupleTag<?> tag) {
- return tag.getId();
- }
-
- /**
- * A translator consumes a {@link PTransform} application and produces the appropriate
- * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
- */
- public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
- FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
index 2ecc041..12f2969 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -96,7 +96,7 @@ public class ParDos {
* A {@link TransformPayloadTranslator} for {@link ParDo}.
*/
public static class ParDoPayloadTranslator
- implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+ implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
public static TransformPayloadTranslator create() {
return new ParDoPayloadTranslator();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 3d8d4cd..da22982 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -131,7 +131,8 @@ class SdkComponents {
return name;
}
checkNotNull(children, "child nodes may not be null");
- componentsBuilder.putTransforms(name, PTransforms.toProto(appliedPTransform, children, this));
+ componentsBuilder.putTransforms(name, PTransformTranslation
+ .toProto(appliedPTransform, children, this));
return name;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index bc568a6..3b3ffa1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.core.construction;
import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.transforms.PTransform;
/** A registrar of TransformPayloadTranslator. */
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
index ea4c996..7ed2a49 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.construction;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
new file mode 100644
index 0000000..0e6ef97
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link PTransformTranslation}.
+ */
+@RunWith(Parameterized.class)
+public class PTransformTranslationTest {
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ToAndFromProtoSpec> data() {
+ // This pipeline exists for construction, not to run any test.
+ // TODO: Leaf node with understood payload - i.e. validate payloads
+ ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
+ ToAndFromProtoSpec readMultipleInAndOut =
+ ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
+ TestPipeline compositeReadPipeline = TestPipeline.create();
+ ToAndFromProtoSpec compositeRead =
+ ToAndFromProtoSpec.composite(
+ generateSequence(compositeReadPipeline),
+ ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
+ return ImmutableList.<ToAndFromProtoSpec>builder()
+ .add(readLeaf)
+ .add(readMultipleInAndOut)
+ .add(compositeRead)
+ // TODO: Composite with multiple children
+ // TODO: Composite with a composite child
+ .build();
+ }
+
+ @AutoValue
+ abstract static class ToAndFromProtoSpec {
+ public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
+ return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(
+ transform, Collections.<ToAndFromProtoSpec>emptyList());
+ }
+
+ public static ToAndFromProtoSpec composite(
+ AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) {
+ List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
+ childSpecs.add(spec);
+ childSpecs.addAll(Arrays.asList(specs));
+ return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(topLevel, childSpecs);
+ }
+
+ abstract AppliedPTransform<?, ?, ?> getTransform();
+ abstract Collection<ToAndFromProtoSpec> getChildren();
+ }
+
+ @Parameter(0)
+ public ToAndFromProtoSpec spec;
+
+ @Test
+ public void toAndFromProto() throws IOException {
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.PTransform converted = convert(spec, components);
+ Components protoComponents = components.toComponents();
+
+ // Sanity checks
+ assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
+ assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
+ assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));
+
+ assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
+ for (PValue inputValue : spec.getTransform().getInputs().values()) {
+ PCollection<?> inputPc = (PCollection<?>) inputValue;
+ protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
+ }
+ for (PValue outputValue : spec.getTransform().getOutputs().values()) {
+ PCollection<?> outputPc = (PCollection<?>) outputValue;
+ protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
+ }
+ }
+
+ private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
+ throws IOException {
+ List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
+ for (ToAndFromProtoSpec child : spec.getChildren()) {
+ childTransforms.add(child.getTransform());
+ System.out.println("Converting child " + child);
+ convert(child, components);
+ // Sanity call
+ components.getExistingPTransformId(child.getTransform());
+ }
+ PTransform convert = PTransformTranslation
+ .toProto(spec.getTransform(), childTransforms, components);
+ // Make sure the converted transform is registered. Convert it independently, but if this is a
+ // child spec, the child must be in the components.
+ components.registerPTransform(spec.getTransform(), childTransforms);
+ return convert;
+ }
+
+ private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
+ // Exists to stop the ParDo application from throwing
+ @ProcessElement public void process(ProcessContext context) {}
+ }
+
+ private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) {
+ GenerateSequence sequence = GenerateSequence.from(0);
+ PCollection<Long> pcollection = pipeline.apply(sequence);
+ return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
+ "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline);
+ }
+
+ private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
+ Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+ PCollection<Long> pcollection = pipeline.apply(transform);
+ return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of(
+ "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
+ }
+
+ private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
+ PCollectionView<String> view =
+ pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
+ PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
+ ParDo.MultiOutput<Long, KV<Long, String>> parDo =
+ ParDo.of(new TestDoFn())
+ .withSideInputs(view)
+ .withOutputTags(
+ new TupleTag<KV<Long, String>>() {},
+ TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
+ PCollectionTuple output = input.apply(parDo);
+
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ inputs.putAll(parDo.getAdditionalInputs());
+ inputs.putAll(input.expand());
+
+ return AppliedPTransform
+ .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
+ "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
deleted file mode 100644
index 4125544..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests for {@link PTransforms}.
- */
-@RunWith(Parameterized.class)
-public class PTransformsTest {
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<ToAndFromProtoSpec> data() {
- // This pipeline exists for construction, not to run any test.
- // TODO: Leaf node with understood payload - i.e. validate payloads
- ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
- ToAndFromProtoSpec readMultipleInAndOut =
- ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
- TestPipeline compositeReadPipeline = TestPipeline.create();
- ToAndFromProtoSpec compositeRead =
- ToAndFromProtoSpec.composite(
- generateSequence(compositeReadPipeline),
- ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
- return ImmutableList.<ToAndFromProtoSpec>builder()
- .add(readLeaf)
- .add(readMultipleInAndOut)
- .add(compositeRead)
- // TODO: Composite with multiple children
- // TODO: Composite with a composite child
- .build();
- }
-
- @AutoValue
- abstract static class ToAndFromProtoSpec {
- public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
- return new AutoValue_PTransformsTest_ToAndFromProtoSpec(
- transform, Collections.<ToAndFromProtoSpec>emptyList());
- }
-
- public static ToAndFromProtoSpec composite(
- AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) {
- List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
- childSpecs.add(spec);
- childSpecs.addAll(Arrays.asList(specs));
- return new AutoValue_PTransformsTest_ToAndFromProtoSpec(topLevel, childSpecs);
- }
-
- abstract AppliedPTransform<?, ?, ?> getTransform();
- abstract Collection<ToAndFromProtoSpec> getChildren();
- }
-
- @Parameter(0)
- public ToAndFromProtoSpec spec;
-
- @Test
- public void toAndFromProto() throws IOException {
- SdkComponents components = SdkComponents.create();
- RunnerApi.PTransform converted = convert(spec, components);
- Components protoComponents = components.toComponents();
-
- // Sanity checks
- assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
- assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
- assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));
-
- assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
- for (PValue inputValue : spec.getTransform().getInputs().values()) {
- PCollection<?> inputPc = (PCollection<?>) inputValue;
- protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
- }
- for (PValue outputValue : spec.getTransform().getOutputs().values()) {
- PCollection<?> outputPc = (PCollection<?>) outputValue;
- protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
- }
- }
-
- private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
- throws IOException {
- List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
- for (ToAndFromProtoSpec child : spec.getChildren()) {
- childTransforms.add(child.getTransform());
- System.out.println("Converting child " + child);
- convert(child, components);
- // Sanity call
- components.getExistingPTransformId(child.getTransform());
- }
- PTransform convert = PTransforms.toProto(spec.getTransform(), childTransforms, components);
- // Make sure the converted transform is registered. Convert it independently, but if this is a
- // child spec, the child must be in the components.
- components.registerPTransform(spec.getTransform(), childTransforms);
- return convert;
- }
-
- private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
- // Exists to stop the ParDo application from throwing
- @ProcessElement public void process(ProcessContext context) {}
- }
-
- private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) {
- GenerateSequence sequence = GenerateSequence.from(0);
- PCollection<Long> pcollection = pipeline.apply(sequence);
- return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
- "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline);
- }
-
- private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
- Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
- PCollection<Long> pcollection = pipeline.apply(transform);
- return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of(
- "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
- }
-
- private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
- PCollectionView<String> view =
- pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
- PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
- ParDo.MultiOutput<Long, KV<Long, String>> parDo =
- ParDo.of(new TestDoFn())
- .withSideInputs(view)
- .withOutputTags(
- new TupleTag<KV<Long, String>>() {},
- TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
- PCollectionTuple output = input.apply(parDo);
-
- Map<TupleTag<?>, PValue> inputs = new HashMap<>();
- inputs.putAll(parDo.getAdditionalInputs());
- inputs.putAll(input.expand());
-
- return AppliedPTransform
- .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
- "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
- }
-}