You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/18 22:41:30 UTC
[1/2] beam git commit: This closes #2597
Repository: beam
Updated Branches:
refs/heads/master 6a7eeeb93 -> 2e0cf0083
This closes #2597
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e0cf008
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e0cf008
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e0cf008
Branch: refs/heads/master
Commit: 2e0cf008303ff5ed48268550b9fe3cd1aa748c15
Parents: 6a7eeeb 790e7fe
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 18 15:41:17 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu May 18 15:41:17 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/PTransforms.java | 8 +-
.../beam/runners/core/construction/ParDos.java | 317 +++++++++++++++++++
.../construction/RunnerPCollectionView.java | 88 +++++
.../runners/core/construction/ParDosTest.java | 229 ++++++++++++++
.../src/main/proto/beam_runner_api.proto | 10 +
.../sdk/transforms/windowing/GlobalWindows.java | 6 +-
6 files changed, 653 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Add ParDos
Posted by tg...@apache.org.
Add ParDos
Add ParDoPayloadTranslator to PTransformTranslator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/790e7fe6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/790e7fe6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/790e7fe6
Branch: refs/heads/master
Commit: 790e7fe6653b926044d3dfecdccbc2fda9c998f0
Parents: 6a7eeeb
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 21 15:06:58 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu May 18 15:41:17 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/PTransforms.java | 8 +-
.../beam/runners/core/construction/ParDos.java | 317 +++++++++++++++++++
.../construction/RunnerPCollectionView.java | 88 +++++
.../runners/core/construction/ParDosTest.java | 229 ++++++++++++++
.../src/main/proto/beam_runner_api.proto | 10 +
.../sdk/transforms/windowing/GlobalWindows.java | 6 +-
6 files changed, 653 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
index d25d342..16276b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
@@ -24,10 +24,12 @@ import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.ParDos.ParDoPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -39,9 +41,9 @@ import org.apache.beam.sdk.values.TupleTag;
public class PTransforms {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS =
- ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder().build();
- // TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload
- // TODO: "Flatten Payload", etc?
+ ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
+ .put(ParDo.MultiOutput.class, ParDoPayloadTranslator.create())
+ .build();
// TODO: Load via service loader.
private PTransforms() {}
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
new file mode 100644
index 0000000..b2b29df
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
+ */
+public class ParDos {
+ /**
+ * The URN for a {@link ParDoPayload}.
+ */
+ public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
+ /**
+ * The URN for an unknown Java {@link DoFn}.
+ */
+ public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
+ /**
+ * The URN for an unknown Java {@link ViewFn}.
+ */
+ public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
+ /**
+ * The URN for an unknown Java {@link WindowMappingFn}.
+ */
+ public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
+ "urn:beam:windowmappingfn:javasdk:0.1";
+
+ /**
+ * A {@link TransformPayloadTranslator} for {@link ParDo}.
+ */
+ public static class ParDoPayloadTranslator
+ implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+ public static TransformPayloadTranslator create() {
+ return new ParDoPayloadTranslator();
+ }
+
+ private ParDoPayloadTranslator() {}
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
+ ParDoPayload payload = toProto(transform.getTransform(), components);
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PAR_DO_PAYLOAD_URN)
+ .setParameter(Any.pack(payload))
+ .build();
+ }
+ }
+
+ public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
+ DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
+ Map<String, StateDeclaration> states = signature.stateDeclarations();
+ Map<String, TimerDeclaration> timers = signature.timerDeclarations();
+ List<Parameter> parameters = signature.processElement().extraParameters();
+
+ ParDoPayload.Builder builder = ParDoPayload.newBuilder();
+ builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+ for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+ builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
+ }
+ for (Parameter parameter : parameters) {
+ Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+ if (protoParameter.isPresent()) {
+ builder.addParameters(protoParameter.get());
+ }
+ }
+ for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
+ StateSpec spec = toProto(state.getValue());
+ builder.putStateSpecs(state.getKey(), spec);
+ }
+ for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
+ TimerSpec spec = toProto(timer.getValue());
+ builder.putTimerSpecs(timer.getKey(), spec);
+ }
+ return builder.build();
+ }
+
+ public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
+ return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
+ }
+
+ public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
+ throws InvalidProtocolBufferException {
+ return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
+ }
+
+ // TODO: Implement
+ private static StateSpec toProto(StateDeclaration state) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ // TODO: Implement
+ private static TimerSpec toProto(TimerDeclaration timer) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ @AutoValue
+ abstract static class DoFnAndMainOutput implements Serializable {
+ public static DoFnAndMainOutput of(
+ DoFn<?, ?> fn, TupleTag<?> tag) {
+ return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag);
+ }
+
+ abstract DoFn<?, ?> getDoFn();
+ abstract TupleTag<?> getMainOutputTag();
+ }
+
+ private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_DO_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(
+ DoFnAndMainOutput.of(fn, tag))))
+ .build())))
+ .build();
+ }
+
+ private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
+ throws InvalidProtocolBufferException {
+ checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
+ byte[] serializedFn =
+ fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ return (DoFnAndMainOutput)
+ SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
+ }
+
+ private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
+ return parameter.match(
+ new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
+ @Override
+ public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
+ return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
+ }
+
+ @Override
+ public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) {
+ return Optional.of(
+ RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
+ }
+
+ @Override
+ protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
+ return Optional.absent();
+ }
+ });
+ }
+
+ private static SideInput toProto(PCollectionView<?> view) {
+ Builder builder = SideInput.newBuilder();
+ builder.setAccessPattern(
+ FunctionSpec.newBuilder()
+ .setUrn(view.getViewFn().getMaterialization().getUrn())
+ .build());
+ builder.setViewFn(toProto(view.getViewFn()));
+ builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
+ return builder.build();
+ }
+
+ public static PCollectionView<?> fromProto(
+ SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+ throws IOException {
+ TupleTag<?> tag = new TupleTag<>(id);
+ WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
+ ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
+
+ RunnerApi.PCollection inputCollection =
+ components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+ WindowingStrategy<?, ?> windowingStrategy =
+ WindowingStrategies.fromProto(
+ components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
+ components);
+ Coder<?> elemCoder =
+ Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
+ Coder<Iterable<WindowedValue<?>>> coder =
+ (Coder)
+ IterableCoder.of(
+ FullWindowedValueCoder.of(
+ elemCoder, windowingStrategy.getWindowFn().windowCoder()));
+ checkArgument(
+ sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+ "Unknown View Materialization URN %s",
+ sideInput.getAccessPattern().getUrn());
+
+ PCollectionView<?> view =
+ new RunnerPCollectionView<>(
+ (TupleTag<Iterable<WindowedValue<?>>>) tag,
+ (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+ windowMappingFn,
+ windowingStrategy,
+ coder);
+ return view;
+ }
+
+ private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+ .build())))
+ .build();
+ }
+
+ private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
+ throws InvalidProtocolBufferException {
+ FunctionSpec spec = viewFn.getSpec();
+ checkArgument(
+ spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
+ "Can't deserialize unknown %s type %s",
+ ViewFn.class.getSimpleName(),
+ spec.getUrn());
+ return (ViewFn<?, ?>)
+ SerializableUtils.deserializeFromByteArray(
+ spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
+ }
+
+ private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowMappingFn)))
+ .build())))
+ .build();
+ }
+
+ private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
+ throws InvalidProtocolBufferException {
+ FunctionSpec spec = windowMappingFn.getSpec();
+ checkArgument(
+ spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
+ "Can't deserialize unknown %s type %s",
+ WindowMappingFn.class.getSimpleName(),
+ spec.getUrn());
+ return (WindowMappingFn<?>)
+ SerializableUtils.deserializeFromByteArray(
+ spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ "Custom WinodwMappingFn");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
new file mode 100644
index 0000000..89e8784
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/** A {@link PCollectionView} created from the components of a {@link SideInput}. */
+class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> {
+ private final TupleTag<Iterable<WindowedValue<?>>> tag;
+ private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn;
+ private final WindowMappingFn<?> windowMappingFn;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Coder<Iterable<WindowedValue<?>>> coder;
+
+ /**
+ * Create a new {@link RunnerPCollectionView} from the provided components.
+ */
+ RunnerPCollectionView(
+ TupleTag<Iterable<WindowedValue<?>>> tag,
+ ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
+ WindowMappingFn<?> windowMappingFn,
+ @Nullable WindowingStrategy<?, ?> windowingStrategy,
+ @Nullable Coder<Iterable<WindowedValue<?>>> coder) {
+ this.tag = tag;
+ this.viewFn = viewFn;
+ this.windowMappingFn = windowMappingFn;
+ this.windowingStrategy = windowingStrategy;
+ this.coder = coder;
+ }
+
+ @Nullable
+ @Override
+ public PCollection<?> getPCollection() {
+ throw new IllegalStateException(
+ String.format("Cannot call getPCollection on a %s", getClass().getSimpleName()));
+ }
+
+ @Override
+ public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
+ return tag;
+ }
+
+ @Override
+ public ViewFn<Iterable<WindowedValue<?>>, T> getViewFn() {
+ return viewFn;
+ }
+
+ @Override
+ public WindowMappingFn<?> getWindowMappingFn() {
+ return windowMappingFn;
+ }
+
+ @Override
+ public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
+ return coder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
new file mode 100644
index 0000000..74edec1
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link ParDos}. */
+@RunWith(Parameterized.class)
+public class ParDosTest {
+ public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ private static PCollectionView<Long> singletonSideInput =
+ p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
+ .apply(View.<Long>asSingleton());
+ private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
+ p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
+ .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
+ .apply(View.<Long, String>asMultimap());
+
+ private static PCollection<KV<Long, String>> mainInput =
+ p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ParDo.MultiOutput<?, ?>> data() {
+ return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
+ ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
+ .withSideInputs(singletonSideInput, multimapSideInput),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(
+ new TupleTag<Void>(),
+ TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
+ .withSideInputs(singletonSideInput, multimapSideInput),
+ ParDo.of(new DropElementsFn())
+ .withOutputTags(
+ new TupleTag<Void>(),
+ TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
+ }
+
+ @Parameter(0)
+ public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+
+ @Test
+ public void testToAndFromProto() throws Exception {
+ SdkComponents components = SdkComponents.create();
+ ParDoPayload payload = ParDos.toProto(parDo, components);
+
+ assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
+ assertThat(
+ ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
+ for (PCollectionView<?> view : parDo.getSideInputs()) {
+ payload.getSideInputsOrThrow(view.getTagInternal().getId());
+ }
+ }
+
+ @Test
+ public void toAndFromTransformProto() throws Exception {
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
+ inputs.putAll(parDo.getAdditionalInputs());
+ PCollectionTuple output = mainInput.apply(parDo);
+
+ SdkComponents components = SdkComponents.create();
+ String transformId =
+ components.registerPTransform(
+ AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
+ "foo", inputs, output.expand(), parDo, p),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+ Components protoComponents = components.toComponents();
+ RunnerApi.PTransform protoTransform =
+ protoComponents.getTransformsOrThrow(transformId);
+ ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ for (PCollectionView<?> view : parDo.getSideInputs()) {
+ SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
+ PCollectionView<?> restoredView =
+ ParDos.fromProto(
+ sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+ assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+ assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+ assertThat(
+ restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+ assertThat(
+ restoredView.getWindowingStrategyInternal(),
+ Matchers.<WindowingStrategy<?, ?>>equalTo(
+ view.getWindowingStrategyInternal().fixDefaults()));
+ assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+ }
+ }
+
+ private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
+ @ProcessElement
+ public void proc(ProcessContext context, BoundedWindow window) {
+ context.output(null);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof DropElementsFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return DropElementsFn.class.hashCode();
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
+ private static final String BAG_STATE_ID = "bagState";
+ private static final String COMBINING_STATE_ID = "combiningState";
+ private static final String EVENT_TIMER_ID = "eventTimer";
+ private static final String PROCESSING_TIMER_ID = "processingTimer";
+
+ @StateId(BAG_STATE_ID)
+ private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
+
+ @StateId(COMBINING_STATE_ID)
+ private final StateSpec<CombiningState<Long, long[], Long>> combiningState =
+ StateSpecs.combining(
+ new BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.max(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MIN_VALUE;
+ }
+ });
+
+ @TimerId(EVENT_TIMER_ID)
+ private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(PROCESSING_TIMER_ID)
+ private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void dropInput(
+ ProcessContext context,
+ BoundedWindow window,
+ @StateId(BAG_STATE_ID) BagState<String> bagStateState,
+ @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
+ @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
+ @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
+ context.output(null);
+ }
+
+ @OnTimer(EVENT_TIMER_ID)
+ public void onEventTime(OnTimerContext context) {}
+
+ @OnTimer(PROCESSING_TIMER_ID)
+ public void onProcessingTime(OnTimerContext context) {}
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof StateTimerDropElementsFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return StateTimerDropElementsFn.class.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index bf4df2a..c8722e6 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -248,10 +248,20 @@ message Parameter {
message StateSpec {
// TODO: AST for state spec
+ string id = 1;
+ Type type = 2;
+
+ enum Type {
+ VALUE = 0;
+ BAG = 1;
+ MAP = 2;
+ SET = 3;
+ }
}
message TimerSpec {
// TODO: AST for timer spec
+ string id = 1;
}
enum IsBounded {
http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 1103a24..d48d26b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms.windowing;
+import com.google.auto.value.AutoValue;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.sdk.coders.Coder;
@@ -61,10 +62,11 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
@Override
public WindowMappingFn<GlobalWindow> getDefaultWindowMappingFn() {
- return new GlobalWindowMappingFn();
+ return new AutoValue_GlobalWindows_GlobalWindowMappingFn();
}
- static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow> {
+ @AutoValue
+ abstract static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow> {
@Override
public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) {
return GlobalWindow.INSTANCE;