You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:12 UTC

[3/9] beam git commit: Rename ParDos to ParDoTranslation

Rename ParDos to ParDoTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44609383
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44609383
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44609383

Branch: refs/heads/master
Commit: 446093836016dabf021d34ca0a858e313f493e2f
Parents: 9b6728e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:28:49 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 348 +++++++++++++++++++
 .../beam/runners/core/construction/ParDos.java  | 348 -------------------
 .../core/construction/ParDoTranslationTest.java | 234 +++++++++++++
 .../runners/core/construction/ParDosTest.java   | 233 -------------
 4 files changed, 582 insertions(+), 581 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
new file mode 100644
index 0000000..baed246
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
+ */
+public class ParDoTranslation {
+  /**
+   * The URN for a {@link ParDoPayload}.
+   */
+  public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
+  /**
+   * The URN for an unknown Java {@link DoFn}.
+   */
+  public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
+  /**
+   * The URN for an unknown Java {@link ViewFn}.
+   */
+  public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
+  /**
+   * The URN for an unknown Java {@link WindowMappingFn}.
+   */
+  public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
+      "urn:beam:windowmappingfn:javasdk:0.1";
+
+  /**
+   * A {@link TransformPayloadTranslator} for {@link ParDo}.
+   */
+  public static class ParDoPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+    public static TransformPayloadTranslator create() {
+      return new ParDoPayloadTranslator();
+    }
+
+    private ParDoPayloadTranslator() {}
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
+      ParDoPayload payload = toProto(transform.getTransform(), components);
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(PAR_DO_PAYLOAD_URN)
+          .setParameter(Any.pack(payload))
+          .build();
+    }
+
+    /**
+     * Registers {@link ParDoPayloadTranslator}.
+     */
+    @AutoService(TransformPayloadTranslatorRegistrar.class)
+    public static class Registrar implements TransformPayloadTranslatorRegistrar {
+      @Override
+      public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+          getTransformPayloadTranslators() {
+        return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
+      }
+    }
+  }
+
+  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
+    DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
+    Map<String, StateDeclaration> states = signature.stateDeclarations();
+    Map<String, TimerDeclaration> timers = signature.timerDeclarations();
+    List<Parameter> parameters = signature.processElement().extraParameters();
+
+    ParDoPayload.Builder builder = ParDoPayload.newBuilder();
+    builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+    for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+      builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
+    }
+    for (Parameter parameter : parameters) {
+      Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+      if (protoParameter.isPresent()) {
+        builder.addParameters(protoParameter.get());
+      }
+    }
+    for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
+      StateSpec spec = toProto(state.getValue());
+      builder.putStateSpecs(state.getKey(), spec);
+    }
+    for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
+      TimerSpec spec = toProto(timer.getValue());
+      builder.putTimerSpecs(timer.getKey(), spec);
+    }
+    return builder.build();
+  }
+
+  public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
+    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
+  }
+
+  public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
+      throws InvalidProtocolBufferException {
+    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
+  }
+
+  public static RunnerApi.PCollection getMainInput(
+      RunnerApi.PTransform ptransform, Components components) throws IOException {
+    checkArgument(
+        ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
+        "Unexpected payload type %s",
+        ptransform.getSpec().getUrn());
+    ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    String mainInputId =
+        Iterables.getOnlyElement(
+            Sets.difference(
+                ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
+    return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+  }
+
+  // TODO: Implement
+  private static StateSpec toProto(StateDeclaration state) {
+    throw new UnsupportedOperationException("Not yet supported");
+  }
+
+  // TODO: Implement
+  private static TimerSpec toProto(TimerDeclaration timer) {
+    throw new UnsupportedOperationException("Not yet supported");
+  }
+
+  @AutoValue
+  abstract static class DoFnAndMainOutput implements Serializable {
+    public static DoFnAndMainOutput of(
+        DoFn<?, ?> fn, TupleTag<?> tag) {
+      return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag);
+    }
+
+    abstract DoFn<?, ?> getDoFn();
+    abstract TupleTag<?> getMainOutputTag();
+  }
+
+  private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_DO_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(
+                                        DoFnAndMainOutput.of(fn, tag))))
+                            .build())))
+        .build();
+  }
+
+  private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
+      throws InvalidProtocolBufferException {
+    checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
+    byte[] serializedFn =
+        fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+    return (DoFnAndMainOutput)
+        SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
+  }
+
+  private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
+    return parameter.match(
+        new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
+          @Override
+          public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
+            return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
+          }
+
+          @Override
+          public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) {
+            return Optional.of(
+                RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
+          }
+
+          @Override
+          protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
+            return Optional.absent();
+          }
+        });
+  }
+
+  private static SideInput toProto(PCollectionView<?> view) {
+    Builder builder = SideInput.newBuilder();
+    builder.setAccessPattern(
+        FunctionSpec.newBuilder()
+            .setUrn(view.getViewFn().getMaterialization().getUrn())
+            .build());
+    builder.setViewFn(toProto(view.getViewFn()));
+    builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
+    return builder.build();
+  }
+
+  public static PCollectionView<?> fromProto(
+      SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+      throws IOException {
+    TupleTag<?> tag = new TupleTag<>(id);
+    WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
+    ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
+
+    RunnerApi.PCollection inputCollection =
+        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+    WindowingStrategy<?, ?> windowingStrategy =
+        WindowingStrategies.fromProto(
+            components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
+            components);
+    Coder<?> elemCoder =
+        Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
+    Coder<Iterable<WindowedValue<?>>> coder =
+        (Coder)
+            IterableCoder.of(
+                FullWindowedValueCoder.of(
+                    elemCoder, windowingStrategy.getWindowFn().windowCoder()));
+    checkArgument(
+        sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+        "Unknown View Materialization URN %s",
+        sideInput.getAccessPattern().getUrn());
+
+    PCollectionView<?> view =
+        new RunnerPCollectionView<>(
+            (TupleTag<Iterable<WindowedValue<?>>>) tag,
+            (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+            windowMappingFn,
+            windowingStrategy,
+            coder);
+    return view;
+  }
+
+  private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+                            .build())))
+        .build();
+  }
+
+  private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
+      throws InvalidProtocolBufferException {
+    FunctionSpec spec = viewFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        ViewFn.class.getSimpleName(),
+        spec.getUrn());
+    return (ViewFn<?, ?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
+  }
+
+  private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(windowMappingFn)))
+                            .build())))
+        .build();
+  }
+
+  private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
+      throws InvalidProtocolBufferException {
+    FunctionSpec spec = windowMappingFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        WindowMappingFn.class.getSimpleName(),
+        spec.getUrn());
+    return (WindowMappingFn<?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+            "Custom WinodwMappingFn");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
deleted file mode 100644
index 12f2969..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
- */
-public class ParDos {
-  /**
-   * The URN for a {@link ParDoPayload}.
-   */
-  public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
-  /**
-   * The URN for an unknown Java {@link DoFn}.
-   */
-  public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link ViewFn}.
-   */
-  public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link WindowMappingFn}.
-   */
-  public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
-      "urn:beam:windowmappingfn:javasdk:0.1";
-
-  /**
-   * A {@link TransformPayloadTranslator} for {@link ParDo}.
-   */
-  public static class ParDoPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
-    public static TransformPayloadTranslator create() {
-      return new ParDoPayloadTranslator();
-    }
-
-    private ParDoPayloadTranslator() {}
-
-    @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
-      ParDoPayload payload = toProto(transform.getTransform(), components);
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(PAR_DO_PAYLOAD_URN)
-          .setParameter(Any.pack(payload))
-          .build();
-    }
-
-    /**
-     * Registers {@link ParDoPayloadTranslator}.
-     */
-    @AutoService(TransformPayloadTranslatorRegistrar.class)
-    public static class Registrar implements TransformPayloadTranslatorRegistrar {
-      @Override
-      public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
-          getTransformPayloadTranslators() {
-        return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
-      }
-    }
-  }
-
-  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
-    DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
-    Map<String, StateDeclaration> states = signature.stateDeclarations();
-    Map<String, TimerDeclaration> timers = signature.timerDeclarations();
-    List<Parameter> parameters = signature.processElement().extraParameters();
-
-    ParDoPayload.Builder builder = ParDoPayload.newBuilder();
-    builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
-    for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
-      builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
-    }
-    for (Parameter parameter : parameters) {
-      Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
-      if (protoParameter.isPresent()) {
-        builder.addParameters(protoParameter.get());
-      }
-    }
-    for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
-      StateSpec spec = toProto(state.getValue());
-      builder.putStateSpecs(state.getKey(), spec);
-    }
-    for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
-      TimerSpec spec = toProto(timer.getValue());
-      builder.putTimerSpecs(timer.getKey(), spec);
-    }
-    return builder.build();
-  }
-
-  public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
-    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
-  }
-
-  public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
-      throws InvalidProtocolBufferException {
-    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
-  }
-
-  public static RunnerApi.PCollection getMainInput(
-      RunnerApi.PTransform ptransform, Components components) throws IOException {
-    checkArgument(
-        ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
-        "Unexpected payload type %s",
-        ptransform.getSpec().getUrn());
-    ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
-    String mainInputId =
-        Iterables.getOnlyElement(
-            Sets.difference(
-                ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
-    return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
-  }
-
-  // TODO: Implement
-  private static StateSpec toProto(StateDeclaration state) {
-    throw new UnsupportedOperationException("Not yet supported");
-  }
-
-  // TODO: Implement
-  private static TimerSpec toProto(TimerDeclaration timer) {
-    throw new UnsupportedOperationException("Not yet supported");
-  }
-
-  @AutoValue
-  abstract static class DoFnAndMainOutput implements Serializable {
-    public static DoFnAndMainOutput of(
-        DoFn<?, ?> fn, TupleTag<?> tag) {
-      return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag);
-    }
-
-    abstract DoFn<?, ?> getDoFn();
-    abstract TupleTag<?> getMainOutputTag();
-  }
-
-  private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
-    return SdkFunctionSpec.newBuilder()
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_JAVA_DO_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(
-                                        DoFnAndMainOutput.of(fn, tag))))
-                            .build())))
-        .build();
-  }
-
-  private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
-      throws InvalidProtocolBufferException {
-    checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
-    byte[] serializedFn =
-        fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-    return (DoFnAndMainOutput)
-        SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
-  }
-
-  private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
-    return parameter.match(
-        new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
-          @Override
-          public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
-            return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
-          }
-
-          @Override
-          public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) {
-            return Optional.of(
-                RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
-          }
-
-          @Override
-          protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
-            return Optional.absent();
-          }
-        });
-  }
-
-  private static SideInput toProto(PCollectionView<?> view) {
-    Builder builder = SideInput.newBuilder();
-    builder.setAccessPattern(
-        FunctionSpec.newBuilder()
-            .setUrn(view.getViewFn().getMaterialization().getUrn())
-            .build());
-    builder.setViewFn(toProto(view.getViewFn()));
-    builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
-    return builder.build();
-  }
-
-  public static PCollectionView<?> fromProto(
-      SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
-      throws IOException {
-    TupleTag<?> tag = new TupleTag<>(id);
-    WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
-    ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
-
-    RunnerApi.PCollection inputCollection =
-        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
-    WindowingStrategy<?, ?> windowingStrategy =
-        WindowingStrategies.fromProto(
-            components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
-            components);
-    Coder<?> elemCoder =
-        Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
-    Coder<Iterable<WindowedValue<?>>> coder =
-        (Coder)
-            IterableCoder.of(
-                FullWindowedValueCoder.of(
-                    elemCoder, windowingStrategy.getWindowFn().windowCoder()));
-    checkArgument(
-        sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
-        "Unknown View Materialization URN %s",
-        sideInput.getAccessPattern().getUrn());
-
-    PCollectionView<?> view =
-        new RunnerPCollectionView<>(
-            (TupleTag<Iterable<WindowedValue<?>>>) tag,
-            (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
-            windowMappingFn,
-            windowingStrategy,
-            coder);
-    return view;
-  }
-
-  private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
-    return SdkFunctionSpec.newBuilder()
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
-                            .build())))
-        .build();
-  }
-
-  private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
-      throws InvalidProtocolBufferException {
-    FunctionSpec spec = viewFn.getSpec();
-    checkArgument(
-        spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
-        "Can't deserialize unknown %s type %s",
-        ViewFn.class.getSimpleName(),
-        spec.getUrn());
-    return (ViewFn<?, ?>)
-        SerializableUtils.deserializeFromByteArray(
-            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
-  }
-
-  private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
-    return SdkFunctionSpec.newBuilder()
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(windowMappingFn)))
-                            .build())))
-        .build();
-  }
-
-  private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
-      throws InvalidProtocolBufferException {
-    FunctionSpec spec = windowMappingFn.getSpec();
-    checkArgument(
-        spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
-        "Can't deserialize unknown %s type %s",
-        WindowMappingFn.class.getSimpleName(),
-        spec.getUrn());
-    return (WindowMappingFn<?>)
-        SerializableUtils.deserializeFromByteArray(
-            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-            "Custom WinodwMappingFn");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
new file mode 100644
index 0000000..ec27957
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link ParDoTranslation}. */
+@RunWith(Parameterized.class)
+public class ParDoTranslationTest {
+  public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  private static PCollectionView<Long> singletonSideInput =
+      p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
+          .apply(View.<Long>asSingleton());
+  private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
+      p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
+          .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
+          .apply(View.<Long, String>asMultimap());
+
+  private static PCollection<KV<Long, String>> mainInput =
+      p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ParDo.MultiOutput<?, ?>> data() {
+    return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
+        ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
+            .withSideInputs(singletonSideInput, multimapSideInput),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(
+                new TupleTag<Void>(),
+                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
+            .withSideInputs(singletonSideInput, multimapSideInput),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(
+                new TupleTag<Void>(),
+                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
+  }
+
+  @Parameter(0)
+  public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+
+  @Test
+  public void testToAndFromProto() throws Exception {
+    SdkComponents components = SdkComponents.create();
+    ParDoPayload payload = ParDoTranslation.toProto(parDo, components);
+
+    assertThat(ParDoTranslation.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
+    assertThat(
+        ParDoTranslation.getMainOutputTag(payload),
+        Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
+    for (PCollectionView<?> view : parDo.getSideInputs()) {
+      payload.getSideInputsOrThrow(view.getTagInternal().getId());
+    }
+  }
+
+  @Test
+  public void toAndFromTransformProto() throws Exception {
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
+    inputs.putAll(parDo.getAdditionalInputs());
+    PCollectionTuple output = mainInput.apply(parDo);
+
+    SdkComponents components = SdkComponents.create();
+    String transformId =
+        components.registerPTransform(
+            AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
+                "foo", inputs, output.expand(), parDo, p),
+            Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+    Components protoComponents = components.toComponents();
+    RunnerApi.PTransform protoTransform =
+        protoComponents.getTransformsOrThrow(transformId);
+    ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    for (PCollectionView<?> view : parDo.getSideInputs()) {
+      SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
+      PCollectionView<?> restoredView =
+          ParDoTranslation.fromProto(
+              sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+      assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+      assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+      assertThat(
+          restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+      assertThat(
+          restoredView.getWindowingStrategyInternal(),
+          Matchers.<WindowingStrategy<?, ?>>equalTo(
+              view.getWindowingStrategyInternal().fixDefaults()));
+      assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+    }
+    String mainInputId = components.registerPCollection(mainInput);
+    assertThat(
+        ParDoTranslation.getMainInput(protoTransform, protoComponents),
+        equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
+  }
+
+  private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
+    @ProcessElement
+    public void proc(ProcessContext context, BoundedWindow window) {
+      context.output(null);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof DropElementsFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return DropElementsFn.class.hashCode();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
+    private static final String BAG_STATE_ID = "bagState";
+    private static final String COMBINING_STATE_ID = "combiningState";
+    private static final String EVENT_TIMER_ID = "eventTimer";
+    private static final String PROCESSING_TIMER_ID = "processingTimer";
+
+    @StateId(BAG_STATE_ID)
+    private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
+
+    @StateId(COMBINING_STATE_ID)
+    private final StateSpec<CombiningState<Long, long[], Long>> combiningState =
+        StateSpecs.combining(
+            new BinaryCombineLongFn() {
+              @Override
+              public long apply(long left, long right) {
+                return Math.max(left, right);
+              }
+
+              @Override
+              public long identity() {
+                return Long.MIN_VALUE;
+              }
+            });
+
+    @TimerId(EVENT_TIMER_ID)
+    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @TimerId(PROCESSING_TIMER_ID)
+    private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @ProcessElement
+    public void dropInput(
+        ProcessContext context,
+        BoundedWindow window,
+        @StateId(BAG_STATE_ID) BagState<String> bagStateState,
+        @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
+        @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
+        @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
+      context.output(null);
+    }
+
+    @OnTimer(EVENT_TIMER_ID)
+    public void onEventTime(OnTimerContext context) {}
+
+    @OnTimer(PROCESSING_TIMER_ID)
+    public void onProcessingTime(OnTimerContext context) {}
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof StateTimerDropElementsFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return StateTimerDropElementsFn.class.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
deleted file mode 100644
index b6f0b7d..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerSpec;
-import org.apache.beam.sdk.state.TimerSpecs;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link ParDos}. */
-@RunWith(Parameterized.class)
-public class ParDosTest {
-  public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  private static PCollectionView<Long> singletonSideInput =
-      p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
-          .apply(View.<Long>asSingleton());
-  private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
-      p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
-          .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
-          .apply(View.<Long, String>asMultimap());
-
-  private static PCollection<KV<Long, String>> mainInput =
-      p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ParDo.MultiOutput<?, ?>> data() {
-    return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
-        ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
-            .withSideInputs(singletonSideInput, multimapSideInput),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(
-                new TupleTag<Void>(),
-                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
-            .withSideInputs(singletonSideInput, multimapSideInput),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(
-                new TupleTag<Void>(),
-                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
-  }
-
-  @Parameter(0)
-  public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
-
-  @Test
-  public void testToAndFromProto() throws Exception {
-    SdkComponents components = SdkComponents.create();
-    ParDoPayload payload = ParDos.toProto(parDo, components);
-
-    assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
-    assertThat(
-        ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
-    for (PCollectionView<?> view : parDo.getSideInputs()) {
-      payload.getSideInputsOrThrow(view.getTagInternal().getId());
-    }
-  }
-
-  @Test
-  public void toAndFromTransformProto() throws Exception {
-    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
-    inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
-    inputs.putAll(parDo.getAdditionalInputs());
-    PCollectionTuple output = mainInput.apply(parDo);
-
-    SdkComponents components = SdkComponents.create();
-    String transformId =
-        components.registerPTransform(
-            AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
-                "foo", inputs, output.expand(), parDo, p),
-            Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
-    Components protoComponents = components.toComponents();
-    RunnerApi.PTransform protoTransform =
-        protoComponents.getTransformsOrThrow(transformId);
-    ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
-    for (PCollectionView<?> view : parDo.getSideInputs()) {
-      SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
-      PCollectionView<?> restoredView =
-          ParDos.fromProto(
-              sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
-      assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
-      assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
-      assertThat(
-          restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
-      assertThat(
-          restoredView.getWindowingStrategyInternal(),
-          Matchers.<WindowingStrategy<?, ?>>equalTo(
-              view.getWindowingStrategyInternal().fixDefaults()));
-      assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
-    }
-    String mainInputId = components.registerPCollection(mainInput);
-    assertThat(
-        ParDos.getMainInput(protoTransform, protoComponents),
-        equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
-  }
-
-  private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
-    @ProcessElement
-    public void proc(ProcessContext context, BoundedWindow window) {
-      context.output(null);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      return other instanceof DropElementsFn;
-    }
-
-    @Override
-    public int hashCode() {
-      return DropElementsFn.class.hashCode();
-    }
-  }
-
-  @SuppressWarnings("unused")
-  private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
-    private static final String BAG_STATE_ID = "bagState";
-    private static final String COMBINING_STATE_ID = "combiningState";
-    private static final String EVENT_TIMER_ID = "eventTimer";
-    private static final String PROCESSING_TIMER_ID = "processingTimer";
-
-    @StateId(BAG_STATE_ID)
-    private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
-
-    @StateId(COMBINING_STATE_ID)
-    private final StateSpec<CombiningState<Long, long[], Long>> combiningState =
-        StateSpecs.combining(
-            new BinaryCombineLongFn() {
-              @Override
-              public long apply(long left, long right) {
-                return Math.max(left, right);
-              }
-
-              @Override
-              public long identity() {
-                return Long.MIN_VALUE;
-              }
-            });
-
-    @TimerId(EVENT_TIMER_ID)
-    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-    @TimerId(PROCESSING_TIMER_ID)
-    private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-
-    @ProcessElement
-    public void dropInput(
-        ProcessContext context,
-        BoundedWindow window,
-        @StateId(BAG_STATE_ID) BagState<String> bagStateState,
-        @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
-        @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
-        @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
-      context.output(null);
-    }
-
-    @OnTimer(EVENT_TIMER_ID)
-    public void onEventTime(OnTimerContext context) {}
-
-    @OnTimer(PROCESSING_TIMER_ID)
-    public void onProcessingTime(OnTimerContext context) {}
-
-    @Override
-    public boolean equals(Object other) {
-      return other instanceof StateTimerDropElementsFn;
-    }
-
-    @Override
-    public int hashCode() {
-      return StateTimerDropElementsFn.class.hashCode();
-    }
-  }
-}