You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:12 UTC
[3/9] beam git commit: Rename ParDos to ParDoTranslation
Rename ParDos to ParDoTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44609383
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44609383
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44609383
Branch: refs/heads/master
Commit: 446093836016dabf021d34ca0a858e313f493e2f
Parents: 9b6728e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:28:49 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 348 +++++++++++++++++++
.../beam/runners/core/construction/ParDos.java | 348 -------------------
.../core/construction/ParDoTranslationTest.java | 234 +++++++++++++
.../runners/core/construction/ParDosTest.java | 233 -------------
4 files changed, 582 insertions(+), 581 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
new file mode 100644
index 0000000..baed246
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
+ */
+public class ParDoTranslation {
+ /**
+ * The URN for a {@link ParDoPayload}.
+ */
+ public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
+ /**
+ * The URN for an unknown Java {@link DoFn}.
+ */
+ public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
+ /**
+ * The URN for an unknown Java {@link ViewFn}.
+ */
+ public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
+ /**
+ * The URN for an unknown Java {@link WindowMappingFn}.
+ */
+ public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
+ "urn:beam:windowmappingfn:javasdk:0.1";
+
+ /**
+ * A {@link TransformPayloadTranslator} for {@link ParDo}.
+ */
+ public static class ParDoPayloadTranslator
+ implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+ public static TransformPayloadTranslator create() {
+ return new ParDoPayloadTranslator();
+ }
+
+ private ParDoPayloadTranslator() {}
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
+ ParDoPayload payload = toProto(transform.getTransform(), components);
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PAR_DO_PAYLOAD_URN)
+ .setParameter(Any.pack(payload))
+ .build();
+ }
+
+ /**
+ * Registers {@link ParDoPayloadTranslator}.
+ */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class Registrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
+ }
+ }
+ }
+
+ public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
+ DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
+ Map<String, StateDeclaration> states = signature.stateDeclarations();
+ Map<String, TimerDeclaration> timers = signature.timerDeclarations();
+ List<Parameter> parameters = signature.processElement().extraParameters();
+
+ ParDoPayload.Builder builder = ParDoPayload.newBuilder();
+ builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+ for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+ builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
+ }
+ for (Parameter parameter : parameters) {
+ Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+ if (protoParameter.isPresent()) {
+ builder.addParameters(protoParameter.get());
+ }
+ }
+ for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
+ StateSpec spec = toProto(state.getValue());
+ builder.putStateSpecs(state.getKey(), spec);
+ }
+ for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
+ TimerSpec spec = toProto(timer.getValue());
+ builder.putTimerSpecs(timer.getKey(), spec);
+ }
+ return builder.build();
+ }
+
+ public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
+ return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
+ }
+
+ public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
+ throws InvalidProtocolBufferException {
+ return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
+ }
+
+ public static RunnerApi.PCollection getMainInput(
+ RunnerApi.PTransform ptransform, Components components) throws IOException {
+ checkArgument(
+ ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
+ "Unexpected payload type %s",
+ ptransform.getSpec().getUrn());
+ ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ String mainInputId =
+ Iterables.getOnlyElement(
+ Sets.difference(
+ ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
+ return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+ }
+
+ // TODO: Implement
+ private static StateSpec toProto(StateDeclaration state) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ // TODO: Implement
+ private static TimerSpec toProto(TimerDeclaration timer) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ @AutoValue
+ abstract static class DoFnAndMainOutput implements Serializable {
+ public static DoFnAndMainOutput of(
+ DoFn<?, ?> fn, TupleTag<?> tag) {
+ return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag);
+ }
+
+ abstract DoFn<?, ?> getDoFn();
+ abstract TupleTag<?> getMainOutputTag();
+ }
+
+ private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_DO_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(
+ DoFnAndMainOutput.of(fn, tag))))
+ .build())))
+ .build();
+ }
+
+ private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
+ throws InvalidProtocolBufferException {
+ checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
+ byte[] serializedFn =
+ fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ return (DoFnAndMainOutput)
+ SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
+ }
+
+ private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
+ return parameter.match(
+ new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
+ @Override
+ public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
+ return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
+ }
+
+ @Override
+ public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) {
+ return Optional.of(
+ RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
+ }
+
+ @Override
+ protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
+ return Optional.absent();
+ }
+ });
+ }
+
+ private static SideInput toProto(PCollectionView<?> view) {
+ Builder builder = SideInput.newBuilder();
+ builder.setAccessPattern(
+ FunctionSpec.newBuilder()
+ .setUrn(view.getViewFn().getMaterialization().getUrn())
+ .build());
+ builder.setViewFn(toProto(view.getViewFn()));
+ builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
+ return builder.build();
+ }
+
+ public static PCollectionView<?> fromProto(
+ SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+ throws IOException {
+ TupleTag<?> tag = new TupleTag<>(id);
+ WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
+ ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
+
+ RunnerApi.PCollection inputCollection =
+ components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+ WindowingStrategy<?, ?> windowingStrategy =
+ WindowingStrategies.fromProto(
+ components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
+ components);
+ Coder<?> elemCoder =
+ Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
+ Coder<Iterable<WindowedValue<?>>> coder =
+ (Coder)
+ IterableCoder.of(
+ FullWindowedValueCoder.of(
+ elemCoder, windowingStrategy.getWindowFn().windowCoder()));
+ checkArgument(
+ sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+ "Unknown View Materialization URN %s",
+ sideInput.getAccessPattern().getUrn());
+
+ PCollectionView<?> view =
+ new RunnerPCollectionView<>(
+ (TupleTag<Iterable<WindowedValue<?>>>) tag,
+ (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+ windowMappingFn,
+ windowingStrategy,
+ coder);
+ return view;
+ }
+
+ private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+ .build())))
+ .build();
+ }
+
+ private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
+ throws InvalidProtocolBufferException {
+ FunctionSpec spec = viewFn.getSpec();
+ checkArgument(
+ spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
+ "Can't deserialize unknown %s type %s",
+ ViewFn.class.getSimpleName(),
+ spec.getUrn());
+ return (ViewFn<?, ?>)
+ SerializableUtils.deserializeFromByteArray(
+ spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
+ }
+
+ private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowMappingFn)))
+ .build())))
+ .build();
+ }
+
+ private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
+ throws InvalidProtocolBufferException {
+ FunctionSpec spec = windowMappingFn.getSpec();
+ checkArgument(
+ spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
+ "Can't deserialize unknown %s type %s",
+ WindowMappingFn.class.getSimpleName(),
+ spec.getUrn());
+ return (WindowMappingFn<?>)
+ SerializableUtils.deserializeFromByteArray(
+ spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ "Custom WinodwMappingFn");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
deleted file mode 100644
index 12f2969..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
- */
-public class ParDos {
- /**
- * The URN for a {@link ParDoPayload}.
- */
- public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
- /**
- * The URN for an unknown Java {@link DoFn}.
- */
- public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
- /**
- * The URN for an unknown Java {@link ViewFn}.
- */
- public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
- /**
- * The URN for an unknown Java {@link WindowMappingFn}.
- */
- public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
- "urn:beam:windowmappingfn:javasdk:0.1";
-
- /**
- * A {@link TransformPayloadTranslator} for {@link ParDo}.
- */
- public static class ParDoPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
- public static TransformPayloadTranslator create() {
- return new ParDoPayloadTranslator();
- }
-
- private ParDoPayloadTranslator() {}
-
- @Override
- public FunctionSpec translate(
- AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
- ParDoPayload payload = toProto(transform.getTransform(), components);
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(PAR_DO_PAYLOAD_URN)
- .setParameter(Any.pack(payload))
- .build();
- }
-
- /**
- * Registers {@link ParDoPayloadTranslator}.
- */
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements TransformPayloadTranslatorRegistrar {
- @Override
- public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
- }
- }
- }
-
- public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
- DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
- Map<String, StateDeclaration> states = signature.stateDeclarations();
- Map<String, TimerDeclaration> timers = signature.timerDeclarations();
- List<Parameter> parameters = signature.processElement().extraParameters();
-
- ParDoPayload.Builder builder = ParDoPayload.newBuilder();
- builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
- for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
- builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
- }
- for (Parameter parameter : parameters) {
- Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
- if (protoParameter.isPresent()) {
- builder.addParameters(protoParameter.get());
- }
- }
- for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
- StateSpec spec = toProto(state.getValue());
- builder.putStateSpecs(state.getKey(), spec);
- }
- for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
- TimerSpec spec = toProto(timer.getValue());
- builder.putTimerSpecs(timer.getKey(), spec);
- }
- return builder.build();
- }
-
- public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
- return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
- }
-
- public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
- throws InvalidProtocolBufferException {
- return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
- }
-
- public static RunnerApi.PCollection getMainInput(
- RunnerApi.PTransform ptransform, Components components) throws IOException {
- checkArgument(
- ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
- "Unexpected payload type %s",
- ptransform.getSpec().getUrn());
- ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
- String mainInputId =
- Iterables.getOnlyElement(
- Sets.difference(
- ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
- return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
- }
-
- // TODO: Implement
- private static StateSpec toProto(StateDeclaration state) {
- throw new UnsupportedOperationException("Not yet supported");
- }
-
- // TODO: Implement
- private static TimerSpec toProto(TimerDeclaration timer) {
- throw new UnsupportedOperationException("Not yet supported");
- }
-
- @AutoValue
- abstract static class DoFnAndMainOutput implements Serializable {
- public static DoFnAndMainOutput of(
- DoFn<?, ?> fn, TupleTag<?> tag) {
- return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag);
- }
-
- abstract DoFn<?, ?> getDoFn();
- abstract TupleTag<?> getMainOutputTag();
- }
-
- private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
- return SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_JAVA_DO_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(
- DoFnAndMainOutput.of(fn, tag))))
- .build())))
- .build();
- }
-
- private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
- throws InvalidProtocolBufferException {
- checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
- byte[] serializedFn =
- fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
- return (DoFnAndMainOutput)
- SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
- }
-
- private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
- return parameter.match(
- new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
- @Override
- public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
- return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
- }
-
- @Override
- public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) {
- return Optional.of(
- RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
- }
-
- @Override
- protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
- return Optional.absent();
- }
- });
- }
-
- private static SideInput toProto(PCollectionView<?> view) {
- Builder builder = SideInput.newBuilder();
- builder.setAccessPattern(
- FunctionSpec.newBuilder()
- .setUrn(view.getViewFn().getMaterialization().getUrn())
- .build());
- builder.setViewFn(toProto(view.getViewFn()));
- builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
- return builder.build();
- }
-
- public static PCollectionView<?> fromProto(
- SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
- throws IOException {
- TupleTag<?> tag = new TupleTag<>(id);
- WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
- ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
-
- RunnerApi.PCollection inputCollection =
- components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
- WindowingStrategy<?, ?> windowingStrategy =
- WindowingStrategies.fromProto(
- components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
- components);
- Coder<?> elemCoder =
- Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
- Coder<Iterable<WindowedValue<?>>> coder =
- (Coder)
- IterableCoder.of(
- FullWindowedValueCoder.of(
- elemCoder, windowingStrategy.getWindowFn().windowCoder()));
- checkArgument(
- sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
- "Unknown View Materialization URN %s",
- sideInput.getAccessPattern().getUrn());
-
- PCollectionView<?> view =
- new RunnerPCollectionView<>(
- (TupleTag<Iterable<WindowedValue<?>>>) tag,
- (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
- windowMappingFn,
- windowingStrategy,
- coder);
- return view;
- }
-
- private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
- return SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
- .build())))
- .build();
- }
-
- private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
- throws InvalidProtocolBufferException {
- FunctionSpec spec = viewFn.getSpec();
- checkArgument(
- spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
- "Can't deserialize unknown %s type %s",
- ViewFn.class.getSimpleName(),
- spec.getUrn());
- return (ViewFn<?, ?>)
- SerializableUtils.deserializeFromByteArray(
- spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
- }
-
- private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
- return SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowMappingFn)))
- .build())))
- .build();
- }
-
- private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
- throws InvalidProtocolBufferException {
- FunctionSpec spec = windowMappingFn.getSpec();
- checkArgument(
- spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
- "Can't deserialize unknown %s type %s",
- WindowMappingFn.class.getSimpleName(),
- spec.getUrn());
- return (WindowMappingFn<?>)
- SerializableUtils.deserializeFromByteArray(
- spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- "Custom WinodwMappingFn");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
new file mode 100644
index 0000000..ec27957
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link ParDoTranslation}. */
+@RunWith(Parameterized.class)
+public class ParDoTranslationTest {
+ public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ private static PCollectionView<Long> singletonSideInput =
+ p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
+ .apply(View.<Long>asSingleton());
+ private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
+ p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
+ .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
+ .apply(View.<Long, String>asMultimap());
+
+ private static PCollection<KV<Long, String>> mainInput =
+ p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ParDo.MultiOutput<?, ?>> data() {
+ return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
+ ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
+ .withSideInputs(singletonSideInput, multimapSideInput),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(
+ new TupleTag<Void>(),
+ TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
+ .withSideInputs(singletonSideInput, multimapSideInput),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(
+ new TupleTag<Void>(),
+ TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
+ }
+
+ @Parameter(0)
+ public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+
+ @Test
+ public void testToAndFromProto() throws Exception {
+ SdkComponents components = SdkComponents.create();
+ ParDoPayload payload = ParDoTranslation.toProto(parDo, components);
+
+ assertThat(ParDoTranslation.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
+ assertThat(
+ ParDoTranslation.getMainOutputTag(payload),
+ Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
+ for (PCollectionView<?> view : parDo.getSideInputs()) {
+ payload.getSideInputsOrThrow(view.getTagInternal().getId());
+ }
+ }
+
+ @Test
+ public void toAndFromTransformProto() throws Exception {
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
+ inputs.putAll(parDo.getAdditionalInputs());
+ PCollectionTuple output = mainInput.apply(parDo);
+
+ SdkComponents components = SdkComponents.create();
+ String transformId =
+ components.registerPTransform(
+ AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
+ "foo", inputs, output.expand(), parDo, p),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+ Components protoComponents = components.toComponents();
+ RunnerApi.PTransform protoTransform =
+ protoComponents.getTransformsOrThrow(transformId);
+ ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ for (PCollectionView<?> view : parDo.getSideInputs()) {
+ SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
+ PCollectionView<?> restoredView =
+ ParDoTranslation.fromProto(
+ sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+ assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+ assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+ assertThat(
+ restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+ assertThat(
+ restoredView.getWindowingStrategyInternal(),
+ Matchers.<WindowingStrategy<?, ?>>equalTo(
+ view.getWindowingStrategyInternal().fixDefaults()));
+ assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+ }
+ String mainInputId = components.registerPCollection(mainInput);
+ assertThat(
+ ParDoTranslation.getMainInput(protoTransform, protoComponents),
+ equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
+ }
+
+ private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
+ @ProcessElement
+ public void proc(ProcessContext context, BoundedWindow window) {
+ context.output(null);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof DropElementsFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return DropElementsFn.class.hashCode();
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
+ private static final String BAG_STATE_ID = "bagState";
+ private static final String COMBINING_STATE_ID = "combiningState";
+ private static final String EVENT_TIMER_ID = "eventTimer";
+ private static final String PROCESSING_TIMER_ID = "processingTimer";
+
+ @StateId(BAG_STATE_ID)
+ private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
+
+ @StateId(COMBINING_STATE_ID)
+ private final StateSpec<CombiningState<Long, long[], Long>> combiningState =
+ StateSpecs.combining(
+ new BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.max(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MIN_VALUE;
+ }
+ });
+
+ @TimerId(EVENT_TIMER_ID)
+ private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(PROCESSING_TIMER_ID)
+ private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void dropInput(
+ ProcessContext context,
+ BoundedWindow window,
+ @StateId(BAG_STATE_ID) BagState<String> bagStateState,
+ @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
+ @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
+ @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
+ context.output(null);
+ }
+
+ @OnTimer(EVENT_TIMER_ID)
+ public void onEventTime(OnTimerContext context) {}
+
+ @OnTimer(PROCESSING_TIMER_ID)
+ public void onProcessingTime(OnTimerContext context) {}
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof StateTimerDropElementsFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return StateTimerDropElementsFn.class.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
deleted file mode 100644
index b6f0b7d..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerSpec;
-import org.apache.beam.sdk.state.TimerSpecs;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link ParDos}. */
-@RunWith(Parameterized.class)
-public class ParDosTest {
- public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
- private static PCollectionView<Long> singletonSideInput =
- p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
- .apply(View.<Long>asSingleton());
- private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
- p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
- .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
- .apply(View.<Long, String>asMultimap());
-
- private static PCollection<KV<Long, String>> mainInput =
- p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<ParDo.MultiOutput<?, ?>> data() {
- return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
- ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
- ParDo.of(new DropElementsFn())
- .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
- .withSideInputs(singletonSideInput, multimapSideInput),
- ParDo.of(new DropElementsFn())
- .withOutputTags(
- new TupleTag<Void>(),
- TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
- .withSideInputs(singletonSideInput, multimapSideInput),
- ParDo.of(new DropElementsFn())
- .withOutputTags(
- new TupleTag<Void>(),
- TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
- }
-
- @Parameter(0)
- public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
-
- @Test
- public void testToAndFromProto() throws Exception {
- SdkComponents components = SdkComponents.create();
- ParDoPayload payload = ParDos.toProto(parDo, components);
-
- assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
- assertThat(
- ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
- for (PCollectionView<?> view : parDo.getSideInputs()) {
- payload.getSideInputsOrThrow(view.getTagInternal().getId());
- }
- }
-
- @Test
- public void toAndFromTransformProto() throws Exception {
- Map<TupleTag<?>, PValue> inputs = new HashMap<>();
- inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
- inputs.putAll(parDo.getAdditionalInputs());
- PCollectionTuple output = mainInput.apply(parDo);
-
- SdkComponents components = SdkComponents.create();
- String transformId =
- components.registerPTransform(
- AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
- "foo", inputs, output.expand(), parDo, p),
- Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
- Components protoComponents = components.toComponents();
- RunnerApi.PTransform protoTransform =
- protoComponents.getTransformsOrThrow(transformId);
- ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
- for (PCollectionView<?> view : parDo.getSideInputs()) {
- SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
- PCollectionView<?> restoredView =
- ParDos.fromProto(
- sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
- assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
- assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
- assertThat(
- restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
- assertThat(
- restoredView.getWindowingStrategyInternal(),
- Matchers.<WindowingStrategy<?, ?>>equalTo(
- view.getWindowingStrategyInternal().fixDefaults()));
- assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
- }
- String mainInputId = components.registerPCollection(mainInput);
- assertThat(
- ParDos.getMainInput(protoTransform, protoComponents),
- equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
- }
-
- private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
- @ProcessElement
- public void proc(ProcessContext context, BoundedWindow window) {
- context.output(null);
- }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof DropElementsFn;
- }
-
- @Override
- public int hashCode() {
- return DropElementsFn.class.hashCode();
- }
- }
-
- @SuppressWarnings("unused")
- private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
- private static final String BAG_STATE_ID = "bagState";
- private static final String COMBINING_STATE_ID = "combiningState";
- private static final String EVENT_TIMER_ID = "eventTimer";
- private static final String PROCESSING_TIMER_ID = "processingTimer";
-
- @StateId(BAG_STATE_ID)
- private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
-
- @StateId(COMBINING_STATE_ID)
- private final StateSpec<CombiningState<Long, long[], Long>> combiningState =
- StateSpecs.combining(
- new BinaryCombineLongFn() {
- @Override
- public long apply(long left, long right) {
- return Math.max(left, right);
- }
-
- @Override
- public long identity() {
- return Long.MIN_VALUE;
- }
- });
-
- @TimerId(EVENT_TIMER_ID)
- private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @TimerId(PROCESSING_TIMER_ID)
- private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-
- @ProcessElement
- public void dropInput(
- ProcessContext context,
- BoundedWindow window,
- @StateId(BAG_STATE_ID) BagState<String> bagStateState,
- @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
- @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
- @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
- context.output(null);
- }
-
- @OnTimer(EVENT_TIMER_ID)
- public void onEventTime(OnTimerContext context) {}
-
- @OnTimer(PROCESSING_TIMER_ID)
- public void onProcessingTime(OnTimerContext context) {}
-
- @Override
- public boolean equals(Object other) {
- return other instanceof StateTimerDropElementsFn;
- }
-
- @Override
- public int hashCode() {
- return StateTimerDropElementsFn.class.hashCode();
- }
- }
-}