You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:47 UTC
[23/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
deleted file mode 100644
index a74e5bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue.TimestampedValueCoder;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * {@code Create<T>} takes a collection of elements of type {@code T}
- * known when the pipeline is constructed and returns a
- * {@code PCollection<T>} containing the elements.
- *
- * <p>Example of use:
- * <pre> {@code
- * Pipeline p = ...;
- *
- * PCollection<Integer> pc = p.apply(Create.of(3, 4, 5).withCoder(BigEndianIntegerCoder.of()));
- *
- * Map<String, Integer> map = ...;
- * PCollection<KV<String, Integer>> pt =
- * p.apply(Create.of(map)
- * .withCoder(KvCoder.of(StringUtf8Coder.of(),
- * BigEndianIntegerCoder.of())));
- * } </pre>
- *
- * <p>{@code Create} can automatically determine the {@code Coder} to use
- * if all elements have the same run-time class, and a default coder is registered for that
- * class. See {@link CoderRegistry} for details on how defaults are determined.
- *
- * <p>If a coder can not be inferred, {@link Create.Values#withCoder} must be called
- * explicitly to set the encoding of the resulting
- * {@code PCollection}.
- *
- * <p>A good use for {@code Create} is when a {@code PCollection}
- * needs to be created without dependencies on files or other external
- * entities. This is especially useful during testing.
- *
- * <p>Caveat: {@code Create} only supports small in-memory datasets,
- * particularly when submitting jobs to the Google Cloud Dataflow
- * service.
- *
- * @param <T> the type of the elements of the resulting {@code PCollection}
- */
-public class Create<T> {
- /**
- * Returns a new {@code Create.Values} transform that produces a
- * {@link PCollection} containing elements of the provided
- * {@code Iterable}.
- *
- * <p>The argument should not be modified after this is called.
- *
- * <p>The elements of the output {@link PCollection} will have a timestamp of negative infinity,
- * see {@link Create#timestamped} for a way of creating a {@code PCollection} with timestamped
- * elements.
- *
- * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
- * if all elements have the same non-parameterized run-time class, and a default coder is
- * registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
- */
- public static <T> Values<T> of(Iterable<T> elems) {
- return new Values<>(elems, Optional.<Coder<T>>absent());
- }
-
- /**
- * Returns a new {@code Create.Values} transform that produces a
- * {@link PCollection} containing the specified elements.
- *
- * <p>The elements will have a timestamp of negative infinity, see
- * {@link Create#timestamped} for a way of creating a {@code PCollection}
- * with timestamped elements.
- *
- * <p>The arguments should not be modified after this is called.
- *
- * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
- * if all elements have the same non-parameterized run-time class, and a default coder is
- * registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
- */
- @SafeVarargs
- public static <T> Values<T> of(T... elems) {
- return of(Arrays.asList(elems));
- }
-
- /**
- * Returns a new {@code Create.Values} transform that produces a
- * {@link PCollection} of {@link KV}s corresponding to the keys and
- * values of the specified {@code Map}.
- *
- * <p>The elements will have a timestamp of negative infinity, see
- * {@link Create#timestamped} for a way of creating a {@code PCollection}
- * with timestamped elements.
- *
- * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
- * if all elements have the same non-parameterized run-time class, and a default coder is
- * registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
- */
- public static <K, V> Values<KV<K, V>> of(Map<K, V> elems) {
- List<KV<K, V>> kvs = new ArrayList<>(elems.size());
- for (Map.Entry<K, V> entry : elems.entrySet()) {
- kvs.add(KV.of(entry.getKey(), entry.getValue()));
- }
- return of(kvs);
- }
-
- /**
- * Returns a new {@link Create.TimestampedValues} transform that produces a
- * {@link PCollection} containing the elements of the provided {@code Iterable}
- * with the specified timestamps.
- *
- * <p>The argument should not be modified after this is called.
- *
- * <p>By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder}
- * to use if all elements have the same non-parameterized run-time class, and a default coder is
- * registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- * Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder explicitly.
- */
- public static <T> TimestampedValues<T> timestamped(Iterable<TimestampedValue<T>> elems) {
- return new TimestampedValues<>(elems, Optional.<Coder<T>>absent());
- }
-
- /**
- * Returns a new {@link Create.TimestampedValues} transform that produces a {@link PCollection}
- * containing the specified elements with the specified timestamps.
- *
- * <p>The arguments should not be modified after this is called.
- */
- @SafeVarargs
- public static <T> TimestampedValues<T> timestamped(
- @SuppressWarnings("unchecked") TimestampedValue<T>... elems) {
- return timestamped(Arrays.asList(elems));
- }
-
- /**
- * Returns a new root transform that produces a {@link PCollection} containing
- * the specified elements with the specified timestamps.
- *
- * <p>The arguments should not be modified after this is called.
- *
- * <p>By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder}
- * to use if all elements have the same non-parameterized run-time class, and a default coder
- * is registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- * Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder explicitly.
-
- * @throws IllegalArgumentException if there are a different number of values
- * and timestamps
- */
- public static <T> TimestampedValues<T> timestamped(
- Iterable<T> values, Iterable<Long> timestamps) {
- List<TimestampedValue<T>> elems = new ArrayList<>();
- Iterator<T> valueIter = values.iterator();
- Iterator<Long> timestampIter = timestamps.iterator();
- while (valueIter.hasNext() && timestampIter.hasNext()) {
- elems.add(TimestampedValue.of(valueIter.next(), new Instant(timestampIter.next())));
- }
- Preconditions.checkArgument(
- !valueIter.hasNext() && !timestampIter.hasNext(),
- "Expect sizes of values and timestamps are same.");
- return timestamped(elems);
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code PTransform} that creates a {@code PCollection} from a set of in-memory objects.
- */
- public static class Values<T> extends PTransform<PInput, PCollection<T>> {
- /**
- * Returns a {@link Create.Values} PTransform like this one that uses the given
- * {@code Coder<T>} to decode each of the objects into a
- * value of type {@code T}.
- *
- * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
- * if all elements have the same non-parameterized run-time class, and a default coder is
- * registered for that class. See {@link CoderRegistry} for details on how defaults are
- * determined.
- *
- * <p>Note that for {@link Create.Values} with no elements, the {@link VoidCoder} is used.
- */
- public Values<T> withCoder(Coder<T> coder) {
- return new Values<>(elems, Optional.of(coder));
- }
-
- public Iterable<T> getElements() {
- return elems;
- }
-
- @Override
- public PCollection<T> apply(PInput input) {
- try {
- Coder<T> coder = getDefaultOutputCoder(input);
- return PCollection
- .<T>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED)
- .setCoder(coder);
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
- + "Please set a coder by invoking Create.withCoder() explicitly.", e);
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException {
- if (coder.isPresent()) {
- return coder.get();
- }
- // First try to deduce a coder using the types of the elements.
- Class<?> elementClazz = Void.class;
- for (T elem : elems) {
- if (elem == null) {
- continue;
- }
- Class<?> clazz = elem.getClass();
- if (elementClazz.equals(Void.class)) {
- elementClazz = clazz;
- } else if (!elementClazz.equals(clazz)) {
- // Elements are not the same type, require a user-specified coder.
- throw new CannotProvideCoderException(
- "Cannot provide coder for Create: The elements are not all of the same class.");
- }
- }
-
- if (elementClazz.getTypeParameters().length == 0) {
- try {
- @SuppressWarnings("unchecked") // elementClazz is a wildcard type
- Coder<T> coder = (Coder<T>) input.getPipeline().getCoderRegistry()
- .getDefaultCoder(TypeDescriptor.of(elementClazz));
- return coder;
- } catch (CannotProvideCoderException exc) {
- // let the next stage try
- }
- }
-
- // If that fails, try to deduce a coder using the elements themselves
- Optional<Coder<T>> coder = Optional.absent();
- for (T elem : elems) {
- Coder<T> c = input.getPipeline().getCoderRegistry().getDefaultCoder(elem);
- if (!coder.isPresent()) {
- coder = Optional.of(c);
- } else if (!Objects.equals(c, coder.get())) {
- throw new CannotProvideCoderException(
- "Cannot provide coder for elements of " + Create.class.getSimpleName() + ":"
- + " For their common class, no coder could be provided."
- + " Based on their values, they do not all default to the same Coder.");
- }
- }
-
- if (!coder.isPresent()) {
- throw new CannotProvideCoderException("Unable to infer a coder. Please register "
- + "a coder for ");
- }
- return coder.get();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** The elements of the resulting PCollection. */
- private final transient Iterable<T> elems;
-
- /** The coder used to encode the values to and from a binary representation. */
- private final transient Optional<Coder<T>> coder;
-
- /**
- * Constructs a {@code Create.Values} transform that produces a
- * {@link PCollection} containing the specified elements.
- *
- * <p>The arguments should not be modified after this is called.
- */
- private Values(Iterable<T> elems, Optional<Coder<T>> coder) {
- this.elems = elems;
- this.coder = coder;
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code PTransform} that creates a {@code PCollection} whose elements have
- * associated timestamps.
- */
- public static class TimestampedValues<T> extends Values<T> {
- /**
- * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given
- * {@code Coder<T>} to decode each of the objects into a
- * value of type {@code T}.
- *
- * <p>By default, {@code Create.TimestampedValues} can automatically determine the
- * {@code Coder} to use if all elements have the same non-parameterized run-time class,
- * and a default coder is registered for that class. See {@link CoderRegistry} for details
- * on how defaults are determined.
- *
- * <p>Note that for {@link Create.TimestampedValues with no elements}, the {@link VoidCoder}
- * is used.
- */
- @Override
- public TimestampedValues<T> withCoder(Coder<T> coder) {
- return new TimestampedValues<>(elems, Optional.<Coder<T>>of(coder));
- }
-
- @Override
- public PCollection<T> apply(PInput input) {
- try {
- Coder<T> coder = getDefaultOutputCoder(input);
- PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
- Create.of(elems).withCoder(TimestampedValueCoder.of(coder)));
-
- PCollection<T> output = intermediate.apply(ParDo.of(new ConvertTimestamps<T>()));
- output.setCoder(coder);
- return output;
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
- + "Please set a coder by invoking CreateTimestamped.withCoder() explicitly.", e);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** The timestamped elements of the resulting PCollection. */
- private final transient Iterable<TimestampedValue<T>> elems;
-
- private TimestampedValues(Iterable<TimestampedValue<T>> elems,
- Optional<Coder<T>> coder) {
- super(
- Iterables.transform(elems, new Function<TimestampedValue<T>, T>() {
- @Override
- public T apply(TimestampedValue<T> input) {
- return input.getValue();
- }
- }), coder);
- this.elems = elems;
- }
-
- private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
- @Override
- public void processElement(ProcessContext c) {
- c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- registerDefaultTransformEvaluator();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static void registerDefaultTransformEvaluator() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Create.Values.class,
- new DirectPipelineRunner.TransformEvaluator<Create.Values>() {
- @Override
- public void evaluate(
- Create.Values transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <T> void evaluateHelper(
- Create.Values<T> transform,
- DirectPipelineRunner.EvaluationContext context) {
- // Convert the Iterable of elems into a List of elems.
- List<T> listElems;
- if (transform.elems instanceof Collection) {
- Collection<T> collectionElems = (Collection<T>) transform.elems;
- listElems = new ArrayList<>(collectionElems.size());
- } else {
- listElems = new ArrayList<>();
- }
- for (T elem : transform.elems) {
- listElems.add(
- context.ensureElementEncodable(context.getOutput(transform), elem));
- }
- context.setPCollection(context.getOutput(transform), listElems);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
deleted file mode 100644
index 5ba9992..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
-import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code DoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
- /**
- * Information accessible to all methods in this {@code DoFn}.
- * Used primarily to output elements.
- */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
- * invoking this {@code DoFn}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should be considered
- * immutable and not be modified in any way. It may be cached or retained
- * by the Dataflow runtime or later steps in the pipeline, or used in
- * other unspecified ways.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to {@link DoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the side output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code sideOutput} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link DoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
-
- /**
- * Creates an {@link Aggregator} in the {@link DoFn} context with the
- * specified name and aggregation logic specified by {@link CombineFn}.
- *
- * <p>For internal use only.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and {@link CombineFn} in this
- * context
- */
- @Experimental(Kind.AGGREGATOR)
- protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
- /**
- * Sets up {@link Aggregator}s created by the {@link DoFn} so they are
- * usable within this context.
- *
- * <p>This method should be called by runners before {@link DoFn#startBundle}
- * is executed.
- */
- @Experimental(Kind.AGGREGATOR)
- protected final void setupDelegateAggregators() {
- for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
- setupDelegateAggregator(aggregator);
- }
-
- aggregatorsAreFinal = true;
- }
-
- private final <AggInputT, AggOutputT> void setupDelegateAggregator(
- DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
- Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
- aggregator.getName(), aggregator.getCombineFn());
-
- aggregator.setDelegate(delegate);
- }
- }
-
- /**
- * Information accessible when running {@link DoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element should be considered immutable. The Dataflow runtime will not mutate the
- * element, so it is safe to cache, etc. The element should not be mutated by any of the
- * {@link DoFn} methods, because it may be cached elsewhere, retained by the Dataflow runtime,
- * or used in other unspecified ways.
- */
- public abstract InputT element();
-
- /**
- * Returns the value of the side input for the window corresponding to the
- * window of the main input element.
- *
- * <p>See
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn#getSideInputWindow}
- * for how this corresponding window is determined.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns the window into which the input element has been assigned.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
- * for more information.
- *
- * @throws UnsupportedOperationException if this {@link DoFn} does
- * not implement {@link RequiresWindowAccess}.
- */
- public abstract BoundedWindow window();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
-
- /**
- * Returns the process context to use for implementing windowing.
- */
- @Experimental
- public abstract WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link DoFn.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- *
- * <p> Note that producing an element whose timestamp is less than the
- * current timestamp may result in late data, i.e. returning a non-zero
- * value here does not impact watermark calculations used for firing
- * windows.
- *
- * @deprecated does not interact well with the watermark.
- */
- @Deprecated
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /**
- * Interface for signaling that a {@link DoFn} needs to access the window the
- * element is being processed in, via {@link DoFn.ProcessContext#window}.
- */
- @Experimental
- public interface RequiresWindowAccess {}
-
- public DoFn() {
- this(new HashMap<String, DelegatingAggregator<?, ?>>());
- }
-
- DoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
- this.aggregators = aggregators;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
- /**
- * Prepares this {@code DoFn} instance for processing a batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void startBundle(Context c) throws Exception {
- }
-
- /**
- * Processes one input element.
- *
- * <p>The current element of the input {@code PCollection} is returned by
- * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow
- * runtime will not mutate the element, so it is safe to cache, etc. The element should not be
- * mutated by any of the {@link DoFn} methods, because it may be cached elsewhere, retained by the
- * Dataflow runtime, or used in other unspecified ways.
- *
- * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
- * Once passed to {@code output} the element should be considered immutable and not be modified in
- * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other
- * unspecified ways.
- *
- * @see ProcessContext
- */
- public abstract void processElement(ProcessContext c) throws Exception;
-
- /**
- * Finishes processing this batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void finishBundle(Context c) throws Exception {
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display metadata.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code DoFn} instance's most-derived
- * class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(getClass()) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code DoFn} instance's
- * most-derived class.
- *
- * <p>In the normal case of a concrete {@code DoFn} subclass with
- * no generic type parameters of its own (including anonymous inner
- * classes), this will be a complete non-generic type, which is good
- * for choosing a default output {@code Coder<OutputT>} for the output
- * {@code PCollection<OutputT>}.
- */
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(getClass()) {};
- }
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the DoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
-
- checkState(!aggregatorsAreFinal, "Cannot create an aggregator during DoFn processing."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the DoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
- SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Returns the {@link Aggregator Aggregators} created by this {@code DoFn}.
- */
- Collection<Aggregator<?, ?>> getAggregators() {
- return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
- }
-
- /**
- * An {@link Aggregator} that delegates calls to addValue to another
- * aggregator.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
- static class DelegatingAggregator<AggInputT, AggOutputT> implements
- Aggregator<AggInputT, AggOutputT>, Serializable {
- private final UUID id;
-
- private final String name;
-
- private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
- private Aggregator<AggInputT, ?> delegate;
-
- public DelegatingAggregator(String name,
- CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- this.id = UUID.randomUUID();
- this.name = checkNotNull(name, "name cannot be null");
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
- (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
- this.combineFn = specificCombiner;
- }
-
- @Override
- public void addValue(AggInputT value) {
- if (delegate == null) {
- throw new IllegalStateException(
- "addValue cannot be called on Aggregator outside of the execution of a DoFn.");
- } else {
- delegate.addValue(value);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
- return combineFn;
- }
-
- /**
- * Sets the current delegate of the Aggregator.
- *
- * @param delegate the delegate to set in this aggregator
- */
- public void setDelegate(Aggregator<AggInputT, ?> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("name", name)
- .add("combineFn", combineFn)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, name, combineFn.getClass());
- }
-
- /**
- * Indicates whether some other object is "equal to" this one.
- *
- * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
- * CombineFns are the same class, and they have identical IDs.
- */
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (o instanceof DelegatingAggregator) {
- DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.name, that.name)
- && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
deleted file mode 100644
index 1c46541..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.FinishBundle;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ProcessElement;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.StartBundle;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.common.ReflectHelpers;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Utility implementing the necessary reflection for working with {@link DoFnWithContext}s.
- */
-public abstract class DoFnReflector {
-
- private interface ExtraContextInfo {
- /**
- * Create an instance of the given instance using the instance factory.
- */
- <InputT, OutputT> Object createInstance(
- DoFnWithContext.ExtraContextFactory<InputT, OutputT> factory);
-
- /**
- * Create the type token for the given type, filling in the generics.
- */
- <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out);
- }
-
- private static final Map<Class<?>, ExtraContextInfo> EXTRA_CONTEXTS = Collections.emptyMap();
- private static final Map<Class<?>, ExtraContextInfo> EXTRA_PROCESS_CONTEXTS =
- ImmutableMap.<Class<?>, ExtraContextInfo>builder()
- .putAll(EXTRA_CONTEXTS)
- .put(BoundedWindow.class, new ExtraContextInfo() {
- @Override
- public <InputT, OutputT> Object
- createInstance(ExtraContextFactory<InputT, OutputT> factory) {
- return factory.window();
- }
-
- @Override
- public <InputT, OutputT> TypeToken<?>
- tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return TypeToken.of(BoundedWindow.class);
- }
- })
- .put(WindowingInternals.class, new ExtraContextInfo() {
- @Override
- public <InputT, OutputT> Object
- createInstance(ExtraContextFactory<InputT, OutputT> factory) {
- return factory.windowingInternals();
- }
-
- @Override
- public <InputT, OutputT> TypeToken<?>
- tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<WindowingInternals<InputT, OutputT>>() {
- }
- .where(new TypeParameter<InputT>() {}, in)
- .where(new TypeParameter<OutputT>() {}, out);
- }
- })
- .build();
-
- /**
- * @return true if the reflected {@link DoFnWithContext} uses a Single Window.
- */
- public abstract boolean usesSingleWindow();
-
- /**
- * Invoke the reflected {@link ProcessElement} method on the given instance.
- *
- * @param fn an instance of the {@link DoFnWithContext} to invoke {@link ProcessElement} on.
- * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ProcessContext}
- * to pass to {@link ProcessElement}.
- */
- abstract <InputT, OutputT> void invokeProcessElement(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.ProcessContext c,
- ExtraContextFactory<InputT, OutputT> extra);
-
- /**
- * Invoke the reflected {@link StartBundle} method on the given instance.
- *
- * @param fn an instance of the {@link DoFnWithContext} to invoke {@link StartBundle} on.
- * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.Context}
- * to pass to {@link StartBundle}.
- */
- <InputT, OutputT> void invokeStartBundle(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra) {
- fn.prepareForProcessing();
- }
-
- /**
- * Invoke the reflected {@link FinishBundle} method on the given instance.
- *
- * @param fn an instance of the {@link DoFnWithContext} to invoke {@link FinishBundle} on.
- * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.Context}
- * to pass to {@link FinishBundle}.
- */
- abstract <InputT, OutputT> void invokeFinishBundle(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra);
-
- private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
- new LinkedHashMap<Class<?>, DoFnReflector>();
-
- /**
- * @return the {@link DoFnReflector} for the given {@link DoFnWithContext}.
- */
- public static DoFnReflector of(
- @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
- DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
- if (reflector != null) {
- return reflector;
- }
-
- reflector = new GenericDoFnReflector(fn);
- REFLECTOR_CACHE.put(fn, reflector);
- return reflector;
- }
-
- /**
- * Create a {@link DoFn} that the {@link DoFnWithContext}.
- */
- public <InputT, OutputT> DoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
- if (usesSingleWindow()) {
- return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
- } else {
- return new SimpleDoFnAdapter<InputT, OutputT>(this, fn);
- }
- }
-
- private static String formatType(TypeToken<?> t) {
- return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
- }
-
- private static String format(Method m) {
- return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
- }
-
- private static Collection<String> describeSupportedTypes(
- Map<Class<?>, ExtraContextInfo> extraProcessContexts,
- final TypeToken<?> in, final TypeToken<?> out) {
- return FluentIterable
- .from(extraProcessContexts.values())
- .transform(new Function<ExtraContextInfo, String>() {
- @Override
- @Nullable
- public String apply(@Nullable ExtraContextInfo input) {
- if (input == null) {
- return null;
- } else {
- return formatType(input.tokenFor(in, out));
- }
- }
- })
- .toSortedSet(String.CASE_INSENSITIVE_ORDER);
- }
-
- @VisibleForTesting
- static <InputT, OutputT> ExtraContextInfo[] verifyProcessMethodArguments(Method m) {
- return verifyMethodArguments(m,
- EXTRA_PROCESS_CONTEXTS,
- new TypeToken<DoFnWithContext<InputT, OutputT>.ProcessContext>() {
- },
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- @VisibleForTesting
- static <InputT, OutputT> ExtraContextInfo[] verifyBundleMethodArguments(Method m) {
- return verifyMethodArguments(m,
- EXTRA_CONTEXTS,
- new TypeToken<DoFnWithContext<InputT, OutputT>.Context>() {
- },
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- /**
- * Verify the method arguments for a given {@link DoFnWithContext} method.
- *
- * <p>The requirements for a method to be valid, are:
- * <ol>
- * <li>The method has at least one argument.
- * <li>The first argument is of type firstContextArg.
- * <li>The remaining arguments have raw types that appear in {@code contexts}
- * <li>Any generics on the extra context arguments match what is expected. Eg.,
- * {@code WindowingInternals<InputT, OutputT>} either matches the
- * {@code InputT} and {@code OutputT} parameters of the
- * {@code DoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
- * </ol>
- *
- * @param m the method to verify
- * @param contexts mapping from raw classes to the {@link ExtraContextInfo} used
- * to create new instances.
- * @param firstContextArg the expected type of the first context argument
- * @param iParam TypeParameter representing the input type
- * @param oParam TypeParameter representing the output type
- */
- @VisibleForTesting static <InputT, OutputT> ExtraContextInfo[] verifyMethodArguments(Method m,
- Map<Class<?>, ExtraContextInfo> contexts,
- TypeToken<?> firstContextArg, TypeParameter<InputT> iParam, TypeParameter<OutputT> oParam) {
-
- if (!void.class.equals(m.getReturnType())) {
- throw new IllegalStateException(String.format(
- "%s must have a void return type", format(m)));
- }
- if (m.isVarArgs()) {
- throw new IllegalStateException(String.format(
- "%s must not have var args", format(m)));
- }
-
- // The first parameter must be present, and must be the specified type
- Type[] params = m.getGenericParameterTypes();
- TypeToken<?> contextToken = null;
- if (params.length > 0) {
- contextToken = TypeToken.of(params[0]);
- }
- if (contextToken == null
- || !contextToken.getRawType().equals(firstContextArg.getRawType())) {
- throw new IllegalStateException(String.format(
- "%s must take a %s as its first argument",
- format(m), firstContextArg.getRawType().getSimpleName()));
- }
- ExtraContextInfo[] contextInfos = new ExtraContextInfo[params.length - 1];
-
- // Fill in the generics in the allExtraContextArgs interface from the types in the
- // Context or ProcessContext DoFn.
- ParameterizedType pt = (ParameterizedType) contextToken.getType();
- // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
- pt = (ParameterizedType) pt.getOwnerType();
- @SuppressWarnings("unchecked")
- TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
- @SuppressWarnings("unchecked")
- TypeToken<OutputT> oActual = (TypeToken<OutputT>) TypeToken.of(pt.getActualTypeArguments()[1]);
-
- // All of the remaining parameters must be a super-interface of allExtraContextArgs
- // that is not listed in the EXCLUDED_INTERFACES set.
- for (int i = 1; i < params.length; i++) {
- TypeToken<?> param = TypeToken.of(params[i]);
-
- ExtraContextInfo info = contexts.get(param.getRawType());
- if (info == null) {
- throw new IllegalStateException(String.format(
- "%s is not a valid context parameter for method %s. Should be one of %s",
- formatType(param), format(m),
- describeSupportedTypes(contexts, iActual, oActual)));
- }
-
- // If we get here, the class matches, but maybe the generics don't:
- TypeToken<?> expected = info.tokenFor(iActual, oActual);
- if (!expected.isSubtypeOf(param)) {
- throw new IllegalStateException(String.format(
- "Incompatible generics in context parameter %s for method %s. Should be %s",
- formatType(param), format(m), formatType(info.tokenFor(iActual, oActual))));
- }
-
- // Register the (now validated) context info
- contextInfos[i - 1] = info;
- }
- return contextInfos;
- }
-
- /**
- * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFnWithContext}.
- */
- private static class GenericDoFnReflector extends DoFnReflector {
-
- private Method startBundle;
- private Method processElement;
- private Method finishBundle;
- private ExtraContextInfo[] processElementArgs;
- private ExtraContextInfo[] startBundleArgs;
- private ExtraContextInfo[] finishBundleArgs;
-
- private GenericDoFnReflector(Class<?> fn) {
- // Locate the annotated methods
- this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
- this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
- this.finishBundle = findAnnotatedMethod(FinishBundle.class, fn, false);
-
- // Verify that their method arguments satisfy our conditions.
- processElementArgs = verifyProcessMethodArguments(processElement);
- if (startBundle != null) {
- startBundleArgs = verifyBundleMethodArguments(startBundle);
- }
- if (finishBundle != null) {
- finishBundleArgs = verifyBundleMethodArguments(finishBundle);
- }
- }
-
- private static Collection<Method> declaredMethodsWithAnnotation(
- Class<? extends Annotation> anno,
- Class<?> startClass, Class<?> stopClass) {
- Collection<Method> matches = new ArrayList<>();
-
- Class<?> clazz = startClass;
- LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-
- // First, find all declared methods on the startClass and parents (up to stopClass)
- while (clazz != null && !clazz.equals(stopClass)) {
- for (Method method : clazz.getDeclaredMethods()) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
-
- Collections.addAll(interfaces, clazz.getInterfaces());
-
- clazz = clazz.getSuperclass();
- }
-
- // Now, iterate over all the discovered interfaces
- for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
- return matches;
- }
-
- private static Method findAnnotatedMethod(
- Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
- Collection<Method> matches = declaredMethodsWithAnnotation(
- anno, fnClazz, DoFnWithContext.class);
-
- if (matches.size() == 0) {
- if (required == true) {
- throw new IllegalStateException(String.format(
- "No method annotated with @%s found in %s",
- anno.getSimpleName(), fnClazz.getName()));
- } else {
- return null;
- }
- }
-
- // If we have at least one match, then either it should be the only match
- // or it should be an extension of the other matches (which came from parent
- // classes).
- Method first = matches.iterator().next();
- for (Method other : matches) {
- if (!first.getName().equals(other.getName())
- || !Arrays.equals(first.getParameterTypes(), other.getParameterTypes())) {
- throw new IllegalStateException(String.format(
- "Found multiple methods annotated with @%s. [%s] and [%s]",
- anno.getSimpleName(), format(first), format(other)));
- }
- }
-
- // We need to be able to call it. We require it is public.
- if ((first.getModifiers() & Modifier.PUBLIC) == 0) {
- throw new IllegalStateException(format(first) + " must be public");
- }
-
- // And make sure its not static.
- if ((first.getModifiers() & Modifier.STATIC) != 0) {
- throw new IllegalStateException(format(first) + " must not be static");
- }
-
- first.setAccessible(true);
- return first;
- }
-
- @Override
- public boolean usesSingleWindow() {
- return usesContext(BoundedWindow.class);
- }
-
- private boolean usesContext(Class<?> context) {
- for (Class<?> clazz : processElement.getParameterTypes()) {
- if (clazz.equals(context)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- <InputT, OutputT> void invokeProcessElement(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.ProcessContext c,
- ExtraContextFactory<InputT, OutputT> extra) {
- invoke(processElement, fn, c, extra, processElementArgs);
- }
-
- @Override
- <InputT, OutputT> void invokeStartBundle(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra) {
- super.invokeStartBundle(fn, c, extra);
- if (startBundle != null) {
- invoke(startBundle, fn, c, extra, startBundleArgs);
- }
- }
-
- @Override
- <InputT, OutputT> void invokeFinishBundle(
- DoFnWithContext<InputT, OutputT> fn,
- DoFnWithContext<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra) {
- if (finishBundle != null) {
- invoke(finishBundle, fn, c, extra, finishBundleArgs);
- }
- }
-
- private <InputT, OutputT> void invoke(Method m,
- DoFnWithContext<InputT, OutputT> on,
- DoFnWithContext<InputT, OutputT>.Context contextArg,
- ExtraContextFactory<InputT, OutputT> extraArgFactory,
- ExtraContextInfo[] extraArgs) {
-
- Class<?>[] parameterTypes = m.getParameterTypes();
- Object[] args = new Object[parameterTypes.length];
- args[0] = contextArg;
- for (int i = 1; i < args.length; i++) {
- args[i] = extraArgs[i - 1].createInstance(extraArgFactory);
- }
-
- try {
- m.invoke(on, args);
- } catch (InvocationTargetException e) {
- // Exception in user code.
- throw UserCodeException.wrap(e.getCause());
- } catch (IllegalAccessException | IllegalArgumentException e) {
- // Exception in our code.
- throw Throwables.propagate(e);
- }
- }
- }
-
- private static class ContextAdapter<InputT, OutputT>
- extends DoFnWithContext<InputT, OutputT>.Context
- implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
-
- private DoFn<InputT, OutputT>.Context context;
-
- private ContextAdapter(
- DoFnWithContext<InputT, OutputT> fn, DoFn<InputT, OutputT>.Context context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public BoundedWindow window() {
- // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException("Can only get the window in ProcessElements");
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the windowingInternals in ProcessElements");
- }
- }
-
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFnWithContext<InputT, OutputT>.ProcessContext
- implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
-
- private DoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFnWithContext<InputT, OutputT> fn,
- DoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return context.windowingInternals();
- }
- }
-
- public static Class<?> getDoFnClass(DoFn<?, ?> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
- } else {
- return fn.getClass();
- }
- }
-
- private static class SimpleDoFnAdapter<InputT, OutputT> extends DoFn<InputT, OutputT> {
-
- private transient DoFnReflector reflector;
- private DoFnWithContext<InputT, OutputT> fn;
-
- private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
- super(fn.aggregators);
- this.reflector = reflector;
- this.fn = fn;
- }
-
- @Override
- public void startBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- reflector.invokeStartBundle(fn, (DoFnWithContext<InputT, OutputT>.Context) adapter, adapter);
- }
-
- @Override
- public void finishBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- reflector.invokeFinishBundle(fn, (DoFnWithContext<InputT, OutputT>.Context) adapter, adapter);
- }
-
- @Override
- public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- reflector.invokeProcessElement(
- fn, (DoFnWithContext<InputT, OutputT>.ProcessContext) adapter, adapter);
- }
-
- @Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- reflector = DoFnReflector.of(fn.getClass());
- }
- }
-
- private static class WindowDoFnAdapter<InputT, OutputT>
- extends SimpleDoFnAdapter<InputT, OutputT> implements DoFn.RequiresWindowAccess {
-
- private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
- super(reflector, fn);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
deleted file mode 100644
index 5447664..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.util.DirectModeExecutionContext;
-import com.google.cloud.dataflow.sdk.util.DirectSideInputReader;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner;
-import com.google.cloud.dataflow.sdk.util.DoFnRunnerBase;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.PTuple;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A harness for unit-testing a {@link DoFn}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * DoFn<InputT, OutputT> fn = ...;
- *
- * DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
- *
- * // Set arguments shared across all batches:
- * fnTester.setSideInputs(...); // If fn takes side inputs.
- * fnTester.setSideOutputTags(...); // If fn writes to side outputs.
- *
- * // Process a batch containing a single input element:
- * Input testInput = ...;
- * List<OutputT> testOutputs = fnTester.processBatch(testInput);
- * Assert.assertThat(testOutputs,
- * JUnitMatchers.hasItems(...));
- *
- * // Process a bigger batch:
- * Assert.assertThat(fnTester.processBatch(i1, i2, ...),
- * JUnitMatchers.hasItems(...));
- * } </pre>
- *
- * @param <InputT> the type of the {@code DoFn}'s (main) input elements
- * @param <OutputT> the type of the {@code DoFn}'s (main) output elements
- */
-public class DoFnTester<InputT, OutputT> {
- /**
- * Returns a {@code DoFnTester} supporting unit-testing of the given
- * {@link DoFn}.
- */
- @SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(fn);
- }
-
- /**
- * Returns a {@code DoFnTester} supporting unit-testing of the given
- * {@link DoFn}.
- */
- @SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT>
- of(DoFnWithContext<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
- }
-
- /**
- * Registers the tuple of values of the side input {@link PCollectionView}s to
- * pass to the {@link DoFn} under test.
- *
- * <p>If needed, first creates a fresh instance of the {@link DoFn}
- * under test.
- *
- * <p>If this isn't called, {@code DoFnTester} assumes the
- * {@link DoFn} takes no side inputs.
- */
- public void setSideInputs(Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs) {
- this.sideInputs = sideInputs;
- resetState();
- }
-
- /**
- * Registers the values of a side input {@link PCollectionView} to
- * pass to the {@link DoFn} under test.
- *
- * <p>If needed, first creates a fresh instance of the {@code DoFn}
- * under test.
- *
- * <p>If this isn't called, {@code DoFnTester} assumes the
- * {@code DoFn} takes no side inputs.
- */
- public void setSideInput(PCollectionView<?> sideInput, Iterable<WindowedValue<?>> value) {
- sideInputs.put(sideInput, value);
- }
-
- /**
- * Registers the values for a side input {@link PCollectionView} to
- * pass to the {@link DoFn} under test. All values are placed
- * in the global window.
- */
- public void setSideInputInGlobalWindow(
- PCollectionView<?> sideInput,
- Iterable<?> value) {
- sideInputs.put(
- sideInput,
- Iterables.transform(value, new Function<Object, WindowedValue<?>>() {
- @Override
- public WindowedValue<?> apply(Object input) {
- return WindowedValue.valueInGlobalWindow(input);
- }
- }));
- }
-
-
- /**
- * Registers the list of {@code TupleTag}s that can be used by the
- * {@code DoFn} under test to output to side output
- * {@code PCollection}s.
- *
- * <p>If needed, first creates a fresh instance of the DoFn under test.
- *
- * <p>If this isn't called, {@code DoFnTester} assumes the
- * {@code DoFn} doesn't emit to any side outputs.
- */
- public void setSideOutputTags(TupleTagList sideOutputTags) {
- this.sideOutputTags = sideOutputTags.getAll();
- resetState();
- }
-
- /**
- * A convenience operation that first calls {@link #startBundle},
- * then calls {@link #processElement} on each of the input elements, then
- * calls {@link #finishBundle}, then returns the result of
- * {@link #takeOutputElements}.
- */
- public List<OutputT> processBatch(Iterable <? extends InputT> inputElements) {
- startBundle();
- for (InputT inputElement : inputElements) {
- processElement(inputElement);
- }
- finishBundle();
- return takeOutputElements();
- }
-
- /**
- * A convenience method for testing {@link DoFn DoFns} with bundles of elements.
- * Logic proceeds as follows:
- *
- * <ol>
- * <li>Calls {@link #startBundle}.</li>
- * <li>Calls {@link #processElement} on each of the arguments.<li>
- * <li>Calls {@link #finishBundle}.</li>
- * <li>Returns the result of {@link #takeOutputElements}.</li>
- * </ol>
- */
- @SafeVarargs
- public final List<OutputT> processBatch(InputT... inputElements) {
- return processBatch(Arrays.asList(inputElements));
- }
-
- /**
- * Calls {@link DoFn#startBundle} on the {@code DoFn} under test.
- *
- * <p>If needed, first creates a fresh instance of the DoFn under test.
- */
- public void startBundle() {
- resetState();
- initializeState();
- fnRunner.startBundle();
- state = State.STARTED;
- }
-
- /**
- * Calls {@link DoFn#processElement} on the {@code DoFn} under test, in a
- * context where {@link DoFn.ProcessContext#element} returns the
- * given element.
- *
- * <p>Will call {@link #startBundle} automatically, if it hasn't
- * already been called.
- *
- * @throws IllegalStateException if the {@code DoFn} under test has already
- * been finished
- */
- public void processElement(InputT element) {
- if (state == State.FINISHED) {
- throw new IllegalStateException("finishBundle() has already been called");
- }
- if (state == State.UNSTARTED) {
- startBundle();
- }
- fnRunner.processElement(WindowedValue.valueInGlobalWindow(element));
- }
-
- /**
- * Calls {@link DoFn#finishBundle} of the {@code DoFn} under test.
- *
- * <p>Will call {@link #startBundle} automatically, if it hasn't
- * already been called.
- *
- * @throws IllegalStateException if the {@code DoFn} under test has already
- * been finished
- */
- public void finishBundle() {
- if (state == State.FINISHED) {
- throw new IllegalStateException("finishBundle() has already been called");
- }
- if (state == State.UNSTARTED) {
- startBundle();
- }
- fnRunner.finishBundle();
- state = State.FINISHED;
- }
-
- /**
- * Returns the elements output so far to the main output. Does not
- * clear them, so subsequent calls will continue to include these
- * elements.
- *
- * @see #takeOutputElements
- * @see #clearOutputElements
- *
- */
- public List<OutputT> peekOutputElements() {
- // TODO: Should we return an unmodifiable list?
- return Lists.transform(
- peekOutputElementsWithTimestamp(),
- new Function<OutputElementWithTimestamp<OutputT>, OutputT>() {
- @Override
- @SuppressWarnings("unchecked")
- public OutputT apply(OutputElementWithTimestamp<OutputT> input) {
- return input.getValue();
- }
- });
- }
-
- /**
- * Returns the elements output so far to the main output with associated timestamps. Does not
- * clear them, so subsequent calls will continue to include these.
- * elements.
- *
- * @see #takeOutputElementsWithTimestamp
- * @see #clearOutputElements
- */
- @Experimental
- public List<OutputElementWithTimestamp<OutputT>> peekOutputElementsWithTimestamp() {
- // TODO: Should we return an unmodifiable list?
- return Lists.transform(
- outputManager.getOutput(mainOutputTag),
- new Function<Object, OutputElementWithTimestamp<OutputT>>() {
- @Override
- @SuppressWarnings("unchecked")
- public OutputElementWithTimestamp<OutputT> apply(Object input) {
- return new OutputElementWithTimestamp<OutputT>(
- ((WindowedValue<OutputT>) input).getValue(),
- ((WindowedValue<OutputT>) input).getTimestamp());
- }
- });
- }
-
- /**
- * Clears the record of the elements output so far to the main output.
- *
- * @see #peekOutputElements
- */
- public void clearOutputElements() {
- peekOutputElements().clear();
- }
-
- /**
- * Returns the elements output so far to the main output.
- * Clears the list so these elements don't appear in future calls.
- *
- * @see #peekOutputElements
- */
- public List<OutputT> takeOutputElements() {
- List<OutputT> resultElems = new ArrayList<>(peekOutputElements());
- clearOutputElements();
- return resultElems;
- }
-
- /**
- * Returns the elements output so far to the main output with associated timestamps.
- * Clears the list so these elements don't appear in future calls.
- *
- * @see #peekOutputElementsWithTimestamp
- * @see #takeOutputElements
- * @see #clearOutputElements
- */
- @Experimental
- public List<OutputElementWithTimestamp<OutputT>> takeOutputElementsWithTimestamp() {
- List<OutputElementWithTimestamp<OutputT>> resultElems =
- new ArrayList<>(peekOutputElementsWithTimestamp());
- clearOutputElements();
- return resultElems;
- }
-
- /**
- * Returns the elements output so far to the side output with the
- * given tag. Does not clear them, so subsequent calls will
- * continue to include these elements.
- *
- * @see #takeSideOutputElements
- * @see #clearSideOutputElements
- */
- public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
- // TODO: Should we return an unmodifiable list?
- return Lists.transform(
- outputManager.getOutput(tag),
- new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }});
- }
-
- /**
- * Clears the record of the elements output so far to the side
- * output with the given tag.
- *
- * @see #peekSideOutputElements
- */
- public <T> void clearSideOutputElements(TupleTag<T> tag) {
- peekSideOutputElements(tag).clear();
- }
-
- /**
- * Returns the elements output so far to the side output with the given tag.
- * Clears the list so these elements don't appear in future calls.
- *
- * @see #peekSideOutputElements
- */
- public <T> List<T> takeSideOutputElements(TupleTag<T> tag) {
- List<T> resultElems = new ArrayList<>(peekSideOutputElements(tag));
- clearSideOutputElements(tag);
- return resultElems;
- }
-
- /**
- * Returns the value of the provided {@link Aggregator}.
- */
- public <AggregateT> AggregateT getAggregatorValue(Aggregator<?, AggregateT> agg) {
- @SuppressWarnings("unchecked")
- Counter<AggregateT> counter =
- (Counter<AggregateT>)
- counterSet.getExistingCounter("user-" + STEP_NAME + "-" + agg.getName());
- return counter.getAggregate();
- }
-
- /**
- * Holder for an OutputElement along with its associated timestamp.
- */
- @Experimental
- public static class OutputElementWithTimestamp<OutputT> {
- private final OutputT value;
- private final Instant timestamp;
-
- OutputElementWithTimestamp(OutputT value, Instant timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-
- OutputT getValue() {
- return value;
- }
-
- Instant getTimestamp() {
- return timestamp;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof OutputElementWithTimestamp)) {
- return false;
- }
- OutputElementWithTimestamp<?> other = (OutputElementWithTimestamp<?>) obj;
- return Objects.equal(other.value, value) && Objects.equal(other.timestamp, timestamp);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(value, timestamp);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** The possible states of processing a DoFn. */
- enum State {
- UNSTARTED,
- STARTED,
- FINISHED
- }
-
- /** The name of the step of a DoFnTester. */
- static final String STEP_NAME = "stepName";
- /** The name of the enclosing DoFn PTransform for a DoFnTester. */
- static final String TRANSFORM_NAME = "transformName";
-
- final PipelineOptions options = PipelineOptionsFactory.create();
-
- /** The original DoFn under test. */
- final DoFn<InputT, OutputT> origFn;
-
- /** The side input values to provide to the DoFn under test. */
- private Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs =
- new HashMap<>();
-
- /** The output tags used by the DoFn under test. */
- TupleTag<OutputT> mainOutputTag = new TupleTag<>();
- List<TupleTag<?>> sideOutputTags = new ArrayList<>();
-
- /** The original DoFn under test, if started. */
- DoFn<InputT, OutputT> fn;
-
- /** The ListOutputManager to examine the outputs. */
- DoFnRunnerBase.ListOutputManager outputManager;
-
- /** The DoFnRunner if processing is in progress. */
- DoFnRunner<InputT, OutputT> fnRunner;
-
- /** Counters for user-defined Aggregators if processing is in progress. */
- CounterSet counterSet;
-
- /** The state of processing of the DoFn under test. */
- State state;
-
- DoFnTester(DoFn<InputT, OutputT> origFn) {
- this.origFn = origFn;
- resetState();
- }
-
- void resetState() {
- fn = null;
- outputManager = null;
- fnRunner = null;
- counterSet = null;
- state = State.UNSTARTED;
- }
-
- @SuppressWarnings("unchecked")
- void initializeState() {
- fn = (DoFn<InputT, OutputT>)
- SerializableUtils.deserializeFromByteArray(
- SerializableUtils.serializeToByteArray(origFn),
- origFn.toString());
- counterSet = new CounterSet();
- PTuple runnerSideInputs = PTuple.empty();
- for (Map.Entry<PCollectionView<?>, Iterable<WindowedValue<?>>> entry
- : sideInputs.entrySet()) {
- runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
- }
- outputManager = new DoFnRunnerBase.ListOutputManager();
- fnRunner = DoFnRunners.createDefault(
- options,
- fn,
- DirectSideInputReader.of(runnerSideInputs),
- outputManager,
- mainOutputTag,
- sideOutputTags,
- DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME, null),
- counterSet.getAddCounterMutator(),
- WindowingStrategy.globalDefault());
- }
-}