You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:11 UTC
[03/19] incubator-beam git commit: Rename DoFnWithContext to DoFn
Rename DoFnWithContext to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bcb6f46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bcb6f46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bcb6f46
Branch: refs/heads/master
Commit: 3bcb6f46ad0ae483d1d8785edc2d9d5846c71a73
Parents: e160966
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:10:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:52 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 429 +++++++++++++++++++
.../beam/sdk/transforms/DoFnReflector.java | 84 ++--
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../beam/sdk/transforms/DoFnWithContext.java | 429 -------------------
.../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 16 +-
.../beam/sdk/transforms/DoFnReflectorTest.java | 86 ++--
.../apache/beam/sdk/transforms/DoFnTest.java | 237 ++++++++++
.../sdk/transforms/DoFnWithContextTest.java | 237 ----------
.../apache/beam/sdk/transforms/ParDoTest.java | 12 +-
.../dofnreflector/DoFnReflectorTestHelper.java | 26 +-
.../transforms/DoFnReflectorBenchmark.java | 30 +-
12 files changed, 795 insertions(+), 795 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
new file mode 100644
index 0000000..eb6753c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code DoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code DoFn}s can be tested in a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output. Unit testing of a {@code DoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>Implementations must define a method annotated with {@link ProcessElement}
+ * that satisfies the requirements described there. See the {@link ProcessElement}
+ * for details.
+ *
+ * <p>This functionality is experimental and likely to change.
+ *
+ * <p>Example usage:
+ *
+ * <pre> {@code
+ * PCollection<String> lines = ... ;
+ * PCollection<String> words =
+ * lines.apply(ParDo.of(new DoFn<String, String>() {
+ * @ProcessElement
+ * public void processElement(ProcessContext c, BoundedWindow window) {
+ *
+ * }}));
+ * } </pre>
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ */
+@Experimental
+public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+
+ /** Information accessible to all methods in this {@code DoFn}. */
+ public abstract class Context {
+
+ /**
+ * Returns the {@code PipelineOptions} specified with the
+ * {@link org.apache.beam.sdk.runners.PipelineRunner}
+ * invoking this {@code DoFn}. The {@code PipelineOptions} will
+ * be the default running via {@link DoFnTester}.
+ */
+ public abstract PipelineOptions getPipelineOptions();
+
+ /**
+ * Adds the given element to the main output {@code PCollection}.
+ *
+ * <p>Once passed to {@code output} the element should not be modified in
+ * any way.
+ *
+ * <p>If invoked from {@link ProcessElement}, the output
+ * element will have the same timestamp and be in the same windows
+ * as the input element passed to the method annotated with
+ * {@code @ProcessElement}.
+ *
+ * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element. The output element
+ * will have a timestamp of negative infinity.
+ */
+ public abstract void output(OutputT output);
+
+ /**
+ * Adds the given element to the main output {@code PCollection},
+ * with the given timestamp.
+ *
+ * <p>Once passed to {@code outputWithTimestamp} the element should not be
+ * modified in any way.
+ *
+ * <p>If invoked from {@link ProcessElement}), the timestamp
+ * must not be older than the input element's timestamp minus
+ * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
+ * be in the same windows as the input element.
+ *
+ * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element except for the
+ * timestamp.
+ */
+ public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+
+ /**
+ * Adds the given element to the side output {@code PCollection} with the
+ * given tag.
+ *
+ * <p>Once passed to {@code sideOutput} the element should not be modified
+ * in any way.
+ *
+ * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
+ * specify the tags of side outputs that it consumes. Non-consumed side
+ * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+ * need to be specified.
+ *
+ * <p>The output element will have the same timestamp and be in the same
+ * windows as the input element passed to {@link ProcessElement}).
+ *
+ * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element. The output element
+ * will have a timestamp of negative infinity.
+ *
+ * @see ParDo#withOutputTags
+ */
+ public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+ /**
+ * Adds the given element to the specified side output {@code PCollection},
+ * with the given timestamp.
+ *
+ * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+ * modified in any way.
+ *
+ * <p>If invoked from {@link ProcessElement}), the timestamp
+ * must not be older than the input element's timestamp minus
+ * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
+ * be in the same windows as the input element.
+ *
+ * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element except for the
+ * timestamp.
+ *
+ * @see ParDo#withOutputTags
+ */
+ public abstract <T> void sideOutputWithTimestamp(
+ TupleTag<T> tag, T output, Instant timestamp);
+ }
+
+ /**
+ * Information accessible when running {@link OldDoFn#processElement}.
+ */
+ public abstract class ProcessContext extends Context {
+
+ /**
+ * Returns the input element to be processed.
+ *
+ * <p>The element will not be changed -- it is safe to cache, etc.
+ * without copying.
+ */
+ public abstract InputT element();
+
+
+ /**
+ * Returns the value of the side input.
+ *
+ * @throws IllegalArgumentException if this is not a side input
+ * @see ParDo#withSideInputs
+ */
+ public abstract <T> T sideInput(PCollectionView<T> view);
+
+ /**
+ * Returns the timestamp of the input element.
+ *
+ * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+ * for more information.
+ */
+ public abstract Instant timestamp();
+
+ /**
+ * Returns information about the pane within this window into which the
+ * input element has been assigned.
+ *
+ * <p>Generally all data is in a single, uninteresting pane unless custom
+ * triggering and/or late data has been explicitly requested.
+ * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+ * for more information.
+ */
+ public abstract PaneInfo pane();
+ }
+
+ /**
+ * Returns the allowed timestamp skew duration, which is the maximum
+ * duration that timestamps can be shifted backward in
+ * {@link DoFn.Context#outputWithTimestamp}.
+ *
+ * <p>The default value is {@code Duration.ZERO}, in which case
+ * timestamps can only be shifted forward to future. For infinite
+ * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+ */
+ public Duration getAllowedTimestampSkew() {
+ return Duration.ZERO;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+
+ /**
+ * Protects aggregators from being created after initialization.
+ */
+ private boolean aggregatorsAreFinal;
+
+ /**
+ * Returns a {@link TypeDescriptor} capturing what is known statically
+ * about the input type of this {@code DoFn} instance's most-derived
+ * class.
+ *
+ * <p>See {@link #getOutputTypeDescriptor} for more discussion.
+ */
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return new TypeDescriptor<InputT>(getClass()) {};
+ }
+
+ /**
+ * Returns a {@link TypeDescriptor} capturing what is known statically
+ * about the output type of this {@code DoFn} instance's
+ * most-derived class.
+ *
+ * <p>In the normal case of a concrete {@code DoFn} subclass with
+ * no generic type parameters of its own (including anonymous inner
+ * classes), this will be a complete non-generic type, which is good
+ * for choosing a default output {@code Coder<O>} for the output
+ * {@code PCollection<O>}.
+ */
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return new TypeDescriptor<OutputT>(getClass()) {};
+ }
+
+ /**
+ * Interface for runner implementors to provide implementations of extra context information.
+ *
+ * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
+ * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
+ * has indicated it needs the given extra context.
+ *
+ * <p>In the case of {@link ProcessElement} it is called once per invocation of
+ * {@link ProcessElement}.
+ */
+ public interface ExtraContextFactory<InputT, OutputT> {
+ /**
+ * Construct the {@link BoundedWindow} to use within a {@link DoFn} that
+ * needs it. This is called if the {@link ProcessElement} method has a parameter of type
+ * {@link BoundedWindow}.
+ *
+ * @return {@link BoundedWindow} of the element currently being processed.
+ */
+ BoundedWindow window();
+
+ /**
+ * Construct the {@link WindowingInternals} to use within a {@link DoFn} that
+ * needs it. This is called if the {@link ProcessElement} method has a parameter of type
+ * {@link WindowingInternals}.
+ */
+ WindowingInternals<InputT, OutputT> windowingInternals();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Annotation for the method to use to prepare an instance for processing a batch of elements.
+ * The method annotated with this must satisfy the following constraints:
+ * <ul>
+ * <li>It must have at least one argument.
+ * <li>Its first (and only) argument must be a {@link DoFn.Context}.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ public @interface StartBundle {}
+
+ /**
+ * Annotation for the method to use for processing elements. A subclass of
+ * {@link DoFn} must have a method with this annotation satisfying
+ * the following constraints in order for it to be executable:
+ * <ul>
+ * <li>It must have at least one argument.
+ * <li>Its first argument must be a {@link DoFn.ProcessContext}.
+ * <li>Its remaining arguments must be {@link BoundedWindow}, or
+ * {@link WindowingInternals WindowingInternals<InputT, OutputT>}.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ public @interface ProcessElement {}
+
+ /**
+ * Annotation for the method to use to prepare an instance for processing a batch of elements.
+ * The method annotated with this must satisfy the following constraints:
+ * <ul>
+ * <li>It must have at least one argument.
+ * <li>Its first (and only) argument must be a {@link DoFn.Context}.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ public @interface FinishBundle {}
+
+ /**
+ * Returns an {@link Aggregator} with aggregation logic specified by the
+ * {@link CombineFn} argument. The name provided must be unique across
+ * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+ * during pipeline construction.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link CombineFn} to use in the aggregator
+ * @return an aggregator for the provided name and combiner in the scope of
+ * this OldDoFn
+ * @throws NullPointerException if the name or combiner is null
+ * @throws IllegalArgumentException if the given name collides with another
+ * aggregator in this scope
+ * @throws IllegalStateException if called during pipeline execution.
+ */
+ public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ checkNotNull(name, "name cannot be null");
+ checkNotNull(combiner, "combiner cannot be null");
+ checkArgument(!aggregators.containsKey(name),
+ "Cannot create aggregator with name %s."
+ + " An Aggregator with that name already exists within this scope.",
+ name);
+ checkState(!aggregatorsAreFinal,
+ "Cannot create an aggregator during pipeline execution."
+ + " Aggregators should be registered during pipeline construction.");
+
+ DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+ new DelegatingAggregator<>(name, combiner);
+ aggregators.put(name, aggregator);
+ return aggregator;
+ }
+
+ /**
+ * Returns an {@link Aggregator} with the aggregation logic specified by the
+ * {@link SerializableFunction} argument. The name provided must be unique
+ * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+ * created during pipeline construction.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link SerializableFunction} to use in the aggregator
+ * @return an aggregator for the provided name and combiner in the scope of
+ * this OldDoFn
+ * @throws NullPointerException if the name or combiner is null
+ * @throws IllegalArgumentException if the given name collides with another
+ * aggregator in this scope
+ * @throws IllegalStateException if called during pipeline execution.
+ */
+ public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
+ String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+ checkNotNull(combiner, "combiner cannot be null.");
+ return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+ }
+
+ /**
+ * Finalize the {@link DoFn} construction to prepare for processing.
+ * This method should be called by runners before any processing methods.
+ */
+ public void prepareForProcessing() {
+ aggregatorsAreFinal = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>By default, does not register any display data. Implementors may override this method
+ * to provide their own display data.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index d8d4181..b504cb4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -18,10 +18,10 @@
package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -94,7 +94,7 @@ import javax.annotation.Nullable;
/**
- * Utility implementing the necessary reflection for working with {@link DoFnWithContext}s.
+ * Utility implementing the necessary reflection for working with {@link DoFn}s.
*/
public abstract class DoFnReflector {
@@ -109,7 +109,7 @@ public abstract class DoFnReflector {
/**
* Enumeration of the parameters available from the {@link ExtraContextFactory} to use as
- * additional parameters for {@link DoFnWithContext} methods.
+ * additional parameters for {@link DoFn} methods.
* <p>
* We don't rely on looking for properly annotated methods within {@link ExtraContextFactory}
* because erasure would make it impossible to completely fill in the type token for context
@@ -139,7 +139,7 @@ public abstract class DoFnReflector {
/**
* Create a type token representing the given parameter. May use the type token associated
- * with the input and output types of the {@link DoFnWithContext}, depending on the extra
+ * with the input and output types of the {@link DoFn}, depending on the extra
* context.
*/
abstract <InputT, OutputT> TypeToken<?> tokenFor(
@@ -190,22 +190,22 @@ public abstract class DoFnReflector {
}
/**
- * @return true if the reflected {@link DoFnWithContext} uses a Single Window.
+ * @return true if the reflected {@link DoFn} uses a Single Window.
*/
public abstract boolean usesSingleWindow();
/** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFnWithContext<InputT, OutputT> fn);
+ DoFn<InputT, OutputT> fn);
private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
new LinkedHashMap<Class<?>, DoFnReflector>();
/**
- * @return the {@link DoFnReflector} for the given {@link DoFnWithContext}.
+ * @return the {@link DoFnReflector} for the given {@link DoFn}.
*/
public static DoFnReflector of(
- @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
+ @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
if (reflector != null) {
return reflector;
@@ -217,9 +217,9 @@ public abstract class DoFnReflector {
}
/**
- * Create a {@link OldDoFn} that the {@link DoFnWithContext}.
+ * Create a {@link OldDoFn} that the {@link DoFn}.
*/
- public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
+ public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) {
if (usesSingleWindow()) {
return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
} else {
@@ -259,7 +259,7 @@ public abstract class DoFnReflector {
static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) {
return verifyMethodArguments(m,
EXTRA_PROCESS_CONTEXTS,
- new TypeToken<DoFnWithContext<InputT, OutputT>.ProcessContext>() {},
+ new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {},
new TypeParameter<InputT>() {},
new TypeParameter<OutputT>() {});
}
@@ -271,13 +271,13 @@ public abstract class DoFnReflector {
}
return verifyMethodArguments(m,
EXTRA_CONTEXTS,
- new TypeToken<DoFnWithContext<InputT, OutputT>.Context>() {},
+ new TypeToken<DoFn<InputT, OutputT>.Context>() {},
new TypeParameter<InputT>() {},
new TypeParameter<OutputT>() {});
}
/**
- * Verify the method arguments for a given {@link DoFnWithContext} method.
+ * Verify the method arguments for a given {@link DoFn} method.
*
* <p>The requirements for a method to be valid, are:
* <ol>
@@ -330,7 +330,7 @@ public abstract class DoFnReflector {
// Fill in the generics in the allExtraContextArgs interface from the types in the
// Context or ProcessContext OldDoFn.
ParameterizedType pt = (ParameterizedType) contextToken.getType();
- // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
+ // We actually want the owner, since ProcessContext and Context are owned by DoFn.
pt = (ParameterizedType) pt.getOwnerType();
@SuppressWarnings("unchecked")
TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
@@ -368,21 +368,21 @@ public abstract class DoFnReflector {
public interface DoFnInvoker<InputT, OutputT> {
/** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
void invokeStartBundle(
- DoFnWithContext<InputT, OutputT>.Context c,
+ DoFn<InputT, OutputT>.Context c,
ExtraContextFactory<InputT, OutputT> extra);
/** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
void invokeFinishBundle(
- DoFnWithContext<InputT, OutputT>.Context c,
+ DoFn<InputT, OutputT>.Context c,
ExtraContextFactory<InputT, OutputT> extra);
/** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
public void invokeProcessElement(
- DoFnWithContext<InputT, OutputT>.ProcessContext c,
+ DoFn<InputT, OutputT>.ProcessContext c,
ExtraContextFactory<InputT, OutputT> extra);
}
/**
- * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFnWithContext}.
+ * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}.
*/
private static class GenericDoFnReflector extends DoFnReflector {
@@ -395,7 +395,7 @@ public abstract class DoFnReflector {
private final Constructor<?> constructor;
private GenericDoFnReflector(
- @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
+ @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
// Locate the annotated methods
this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
@@ -442,7 +442,7 @@ public abstract class DoFnReflector {
private static Method findAnnotatedMethod(
Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches = declaredMethodsWithAnnotation(
- anno, fnClazz, DoFnWithContext.class);
+ anno, fnClazz, DoFn.class);
if (matches.size() == 0) {
if (required == true) {
@@ -493,12 +493,12 @@ public abstract class DoFnReflector {
/**
* Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given
- * {@link DoFnWithContext}.
+ * {@link DoFn}.
* @param clazz
* @return
*/
private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
- @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> clazz) {
+ @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) {
final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
@@ -545,7 +545,7 @@ public abstract class DoFnReflector {
@Override
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFnWithContext<InputT, OutputT> fn) {
+ DoFn<InputT, OutputT> fn) {
try {
@SuppressWarnings("unchecked")
DoFnInvoker<InputT, OutputT> invoker =
@@ -562,13 +562,13 @@ public abstract class DoFnReflector {
}
private static class ContextAdapter<InputT, OutputT>
- extends DoFnWithContext<InputT, OutputT>.Context
- implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
+ extends DoFn<InputT, OutputT>.Context
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.Context context;
private ContextAdapter(
- DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+ DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
fn.super();
this.context = context;
}
@@ -600,14 +600,14 @@ public abstract class DoFnReflector {
@Override
public BoundedWindow window() {
- // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
+ // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
// should be unreachable.
throw new UnsupportedOperationException("Can only get the window in ProcessElements");
}
@Override
public WindowingInternals<InputT, OutputT> windowingInternals() {
- // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
+ // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
// should be unreachable.
throw new UnsupportedOperationException(
"Can only get the windowingInternals in ProcessElements");
@@ -615,13 +615,13 @@ public abstract class DoFnReflector {
}
private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFnWithContext<InputT, OutputT>.ProcessContext
- implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
+ extends DoFn<InputT, OutputT>.ProcessContext
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.ProcessContext context;
private ProcessContextAdapter(
- DoFnWithContext<InputT, OutputT> fn,
+ DoFn<InputT, OutputT> fn,
OldDoFn<InputT, OutputT>.ProcessContext context) {
fn.super();
this.context = context;
@@ -693,10 +693,10 @@ public abstract class DoFnReflector {
private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
- private final DoFnWithContext<InputT, OutputT> fn;
+ private final DoFn<InputT, OutputT> fn;
private transient DoFnInvoker<InputT, OutputT> invoker;
- private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
+ private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
super(fn.aggregators);
this.fn = fn;
this.invoker = reflector.bindInvoker(fn);
@@ -745,7 +745,7 @@ public abstract class DoFnReflector {
private static class WindowDoFnAdapter<InputT, OutputT>
extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
- private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
+ private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
super(reflector, fn);
}
}
@@ -770,7 +770,7 @@ public abstract class DoFnReflector {
try {
prepareMethod = new MethodLocator.ForExplicitMethod(
new MethodDescription.ForLoadedMethod(
- DoFnWithContext.class.getDeclaredMethod("prepareForProcessing")))
+ DoFn.class.getDeclaredMethod("prepareForProcessing")))
.resolve(instrumentedMethod);
} catch (NoSuchMethodException | SecurityException e) {
throw new RuntimeException("Unable to locate prepareForProcessing method", e);
@@ -817,7 +817,7 @@ public abstract class DoFnReflector {
/**
* A byte-buddy {@link Implementation} that delegates a call that receives
- * {@link AdditionalParameter} to the given {@link DoFnWithContext} method.
+ * {@link AdditionalParameter} to the given {@link DoFn} method.
*/
private static final class InvokerDelegation implements Implementation {
@Nullable
@@ -845,7 +845,7 @@ public abstract class DoFnReflector {
/**
* Generate the {@link Implementation} of one of the life-cycle methods of a
- * {@link DoFnWithContext}.
+ * {@link DoFn}.
*/
private static Implementation create(
@Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) {
@@ -869,7 +869,7 @@ public abstract class DoFnReflector {
}
/**
- * Stack manipulation to push the {@link DoFnWithContext} reference stored in the
+ * Stack manipulation to push the {@link DoFn} reference stored in the
* delegate field of the invoker on to the top of the stack.
*
* <p>This implementation is derived from the code for
@@ -1018,7 +1018,7 @@ public abstract class DoFnReflector {
/**
* A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
* for a constructor that takes a single argument and assigns it to the delegate field.
- * {@link AdditionalParameter} to the given {@link DoFnWithContext} method.
+ * {@link AdditionalParameter} to the given {@link DoFn} method.
*/
private static final class InvokerConstructor implements Implementation {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 9336e4c..f44a9ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -90,7 +90,7 @@ public class DoFnTester<InputT, OutputT> {
*/
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT>
- of(DoFnWithContext<InputT, OutputT> fn) {
+ of(DoFn<InputT, OutputT> fn) {
return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
deleted file mode 100644
index b27163a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFnWithContext}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFnWithContext}s can be tested in a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code DoFnWithContext},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>Implementations must define a method annotated with {@link ProcessElement}
- * that satisfies the requirements described there. See the {@link ProcessElement}
- * for details.
- *
- * <p>This functionality is experimental and likely to change.
- *
- * <p>Example usage:
- *
- * <pre> {@code
- * PCollection<String> lines = ... ;
- * PCollection<String> words =
- * lines.apply(ParDo.of(new DoFnWithContext<String, String>() {
- * @ProcessElement
- * public void processElement(ProcessContext c, BoundedWindow window) {
- *
- * }}));
- * } </pre>
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-@Experimental
-public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData {
-
- /** Information accessible to all methods in this {@code DoFnWithContext}. */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link org.apache.beam.sdk.runners.PipelineRunner}
- * invoking this {@code DoFnWithContext}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should not be modified in
- * any way.
- *
- * <p>If invoked from {@link ProcessElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to the method annotated with
- * {@code @ProcessElement}.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link ProcessElement}), the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the side output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code sideOutput} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link ProcessElement}).
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link ProcessElement}), the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
- }
-
- /**
- * Information accessible when running {@link OldDoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element will not be changed -- it is safe to cache, etc.
- * without copying.
- */
- public abstract InputT element();
-
-
- /**
- * Returns the value of the side input.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link DoFnWithContext.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- */
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code DoFnWithContext} instance's most-derived
- * class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(getClass()) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code DoFnWithContext} instance's
- * most-derived class.
- *
- * <p>In the normal case of a concrete {@code DoFnWithContext} subclass with
- * no generic type parameters of its own (including anonymous inner
- * classes), this will be a complete non-generic type, which is good
- * for choosing a default output {@code Coder<O>} for the output
- * {@code PCollection<O>}.
- */
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(getClass()) {};
- }
-
- /**
- * Interface for runner implementors to provide implementations of extra context information.
- *
- * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
- * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
- * has indicated it needs the given extra context.
- *
- * <p>In the case of {@link ProcessElement} it is called once per invocation of
- * {@link ProcessElement}.
- */
- public interface ExtraContextFactory<InputT, OutputT> {
- /**
- * Construct the {@link BoundedWindow} to use within a {@link DoFnWithContext} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link BoundedWindow}.
- *
- * @return {@link BoundedWindow} of the element currently being processed.
- */
- BoundedWindow window();
-
- /**
- * Construct the {@link WindowingInternals} to use within a {@link DoFnWithContext} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link WindowingInternals}.
- */
- WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Annotation for the method to use to prepare an instance for processing a batch of elements.
- * The method annotated with this must satisfy the following constraints:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface StartBundle {}
-
- /**
- * Annotation for the method to use for processing elements. A subclass of
- * {@link DoFnWithContext} must have a method with this annotation satisfying
- * the following constraints in order for it to be executable:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first argument must be a {@link DoFnWithContext.ProcessContext}.
- * <li>Its remaining arguments must be {@link BoundedWindow}, or
- * {@link WindowingInternals WindowingInternals<InputT, OutputT>}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface ProcessElement {}
-
- /**
- * Annotation for the method to use to prepare an instance for processing a batch of elements.
- * The method annotated with this must satisfy the following constraints:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface FinishBundle {}
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline execution.
- */
- public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
- checkState(!aggregatorsAreFinal,
- "Cannot create an aggregator during pipeline execution."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline execution.
- */
- public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
- String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Finalize the {@link DoFnWithContext} construction to prepare for processing.
- * This method should be called by runners before any processing methods.
- */
- void prepareForProcessing() {
- aggregatorsAreFinal = true;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 48c6033..f640442 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -63,7 +63,7 @@ import java.util.UUID;
* separately from any {@code ParDo} transform or {@code Pipeline},
* can be done via the {@link DoFnTester} harness.
*
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
+ * <p>{@link DoFn} (currently experimental) offers an alternative
* mechanism for accessing {@link ProcessContext#window()} without the need
* to implement {@link RequiresWindowAccess}.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 36d8101..bb1af9c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -527,13 +527,13 @@ public class ParDo {
}
private static <InputT, OutputT> OldDoFn<InputT, OutputT>
- adapt(DoFnWithContext<InputT, OutputT> fn) {
+ adapt(DoFn<InputT, OutputT> fn) {
return DoFnReflector.of(fn.getClass()).toDoFn(fn);
}
/**
* Creates a {@link ParDo} {@link PTransform} that will invoke the
- * given {@link DoFnWithContext} function.
+ * given {@link DoFn} function.
*
* <p>The resulting {@link PTransform PTransform's} types have been bound, with the
* input being a {@code PCollection<InputT>} and the output a
@@ -541,11 +541,11 @@ public class ParDo {
* {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
* properties can be set on it first.
*
- * <p>{@link DoFnWithContext} is an experimental alternative to
+ * <p>{@link DoFn} is an experimental alternative to
* {@link OldDoFn} which simplifies accessing the window of the element.
*/
@Experimental
- public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
return of(adapt(fn), fn.getClass());
}
@@ -633,13 +633,13 @@ public class ParDo {
/**
* Returns a new {@link ParDo} {@link PTransform} that's like this
- * transform but which will invoke the given {@link DoFnWithContext}
+ * transform but which will invoke the given {@link DoFn}
* function, and which has its input and output types bound. Does
* not modify this transform. The resulting {@link PTransform} is
* sufficiently specified to be applied, but more properties can
* still be specified.
*/
- public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
return of(adapt(fn), fn.getClass());
}
}
@@ -845,12 +845,12 @@ public class ParDo {
/**
* Returns a new multi-output {@link ParDo} {@link PTransform}
* that's like this transform but which will invoke the given
- * {@link DoFnWithContext} function, and which has its input type bound.
+ * {@link DoFn} function, and which has its input type bound.
* Does not modify this transform. The resulting
* {@link PTransform} is sufficiently specified to be applied, but
* more properties can still be specified.
*/
- public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+ public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
return of(adapt(fn), fn.getClass());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index 0cb3d7b..df9e441 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -21,10 +21,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.transforms.DoFnWithContext.Context;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
@@ -61,13 +61,13 @@ public class DoFnReflectorTest {
}
}
- private DoFnWithContext<String, String> fn;
+ private DoFn<String, String> fn;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Mock
- private DoFnWithContext<String, String>.ProcessContext mockContext;
+ private DoFn<String, String>.ProcessContext mockContext;
@Mock
private BoundedWindow mockWindow;
@Mock
@@ -91,7 +91,7 @@ public class DoFnReflectorTest {
};
}
- private DoFnReflector underTest(DoFnWithContext<String, String> fn) {
+ private DoFnReflector underTest(DoFn<String, String> fn) {
this.fn = fn;
return DoFnReflector.of(fn.getClass());
}
@@ -141,7 +141,7 @@ public class DoFnReflectorTest {
@Test
public void testDoFnWithNoExtraContext() throws Exception {
final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+ DoFnReflector reflector = underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c)
@@ -172,19 +172,19 @@ public class DoFnReflectorTest {
interface InterfaceWithProcessElement {
@ProcessElement
- void processElement(DoFnWithContext<String, String>.ProcessContext c);
+ void processElement(DoFn<String, String>.ProcessContext c);
}
interface LayersOfInterfaces extends InterfaceWithProcessElement {}
private class IdentityUsingInterfaceWithProcessElement
- extends DoFnWithContext<String, String>
+ extends DoFn<String, String>
implements LayersOfInterfaces {
private Invocations invocations = new Invocations("Named Class");
@Override
- public void processElement(DoFnWithContext<String, String>.ProcessContext c) {
+ public void processElement(DoFn<String, String>.ProcessContext c) {
invocations.wasProcessElementInvoked = true;
assertSame(c, mockContext);
}
@@ -198,7 +198,7 @@ public class DoFnReflectorTest {
checkInvokeProcessElementWorks(reflector, fn.invocations);
}
- private class IdentityParent extends DoFnWithContext<String, String> {
+ private class IdentityParent extends DoFn<String, String> {
protected Invocations parentInvocations = new Invocations("IdentityParent");
@ProcessElement
@@ -215,7 +215,7 @@ public class DoFnReflectorTest {
protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
@Override
- public void process(DoFnWithContext<String, String>.ProcessContext c) {
+ public void process(DoFn<String, String>.ProcessContext c) {
super.process(c);
childInvocations.wasProcessElementInvoked = true;
}
@@ -240,7 +240,7 @@ public class DoFnReflectorTest {
@Test
public void testDoFnWithWindow() throws Exception {
final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+ DoFnReflector reflector = underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow w)
@@ -259,7 +259,7 @@ public class DoFnReflectorTest {
@Test
public void testDoFnWithWindowingInternals() throws Exception {
final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+ DoFnReflector reflector = underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c, WindowingInternals<String, String> w)
@@ -278,7 +278,7 @@ public class DoFnReflectorTest {
@Test
public void testDoFnWithStartBundle() throws Exception {
final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+ DoFnReflector reflector = underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
@@ -304,7 +304,7 @@ public class DoFnReflectorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("No method annotated with @ProcessElement found");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {});
+ underTest(new DoFn<String, String>() {});
}
@Test
@@ -314,7 +314,7 @@ public class DoFnReflectorTest {
thrown.expectMessage("foo()");
thrown.expectMessage("bar()");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
public void foo() {}
@@ -330,7 +330,7 @@ public class DoFnReflectorTest {
thrown.expectMessage("bar()");
thrown.expectMessage("baz()");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
public void foo() {}
@@ -349,7 +349,7 @@ public class DoFnReflectorTest {
thrown.expectMessage("bar()");
thrown.expectMessage("baz()");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
public void foo() {}
@@ -361,7 +361,7 @@ public class DoFnReflectorTest {
});
}
- private static class PrivateDoFnClass extends DoFnWithContext<String, String> {
+ private static class PrivateDoFnClass extends DoFn<String, String> {
final Invocations invocations = new Invocations(getClass().getName());
@ProcessElement
@@ -429,7 +429,7 @@ public class DoFnReflectorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("process() must be public");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
private void process() {}
});
@@ -440,7 +440,7 @@ public class DoFnReflectorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("startBundle() must be public");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement() {}
@@ -454,7 +454,7 @@ public class DoFnReflectorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("finishBundle() must be public");
thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFnWithContext<String, String>() {
+ underTest(new DoFn<String, String>() {
@ProcessElement
public void processElement() {}
@@ -490,7 +490,7 @@ public class DoFnReflectorTest {
}
@SuppressWarnings({"unused"})
- private void badExtraContext(DoFnWithContext<Integer, String>.Context c, int n) {}
+ private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
@Test
public void testBadExtraContext() throws Exception {
@@ -505,7 +505,7 @@ public class DoFnReflectorTest {
@SuppressWarnings({"unused"})
private void badExtraProcessContext(
- DoFnWithContext<Integer, String>.ProcessContext c, Integer n) {}
+ DoFn<Integer, String>.ProcessContext c, Integer n) {}
@Test
public void testBadExtraProcessContextType() throws Exception {
@@ -534,58 +534,58 @@ public class DoFnReflectorTest {
}
@SuppressWarnings("unused")
- private void goodGenerics(DoFnWithContext<Integer, String>.ProcessContext c,
+ private void goodGenerics(DoFn<Integer, String>.ProcessContext c,
WindowingInternals<Integer, String> i1) {}
@Test
public void testValidGenerics() throws Exception {
Method method = getClass().getDeclaredMethod("goodGenerics",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void goodWildcards(DoFnWithContext<Integer, String>.ProcessContext c,
+ private void goodWildcards(DoFn<Integer, String>.ProcessContext c,
WindowingInternals<?, ?> i1) {}
@Test
public void testGoodWildcards() throws Exception {
Method method = getClass().getDeclaredMethod("goodWildcards",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void goodBoundedWildcards(DoFnWithContext<Integer, String>.ProcessContext c,
+ private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c,
WindowingInternals<? super Integer, ? super String> i1) {}
@Test
public void testGoodBoundedWildcards() throws Exception {
Method method = getClass().getDeclaredMethod("goodBoundedWildcards",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
private <InputT, OutputT> void goodTypeVariables(
- DoFnWithContext<InputT, OutputT>.ProcessContext c,
+ DoFn<InputT, OutputT>.ProcessContext c,
WindowingInternals<InputT, OutputT> i1) {}
@Test
public void testGoodTypeVariables() throws Exception {
Method method = getClass().getDeclaredMethod("goodTypeVariables",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void badGenericTwoArgs(DoFnWithContext<Integer, String>.ProcessContext c,
+ private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c,
WindowingInternals<Integer, Integer> i1) {}
@Test
public void testBadGenericsTwoArgs() throws Exception {
Method method = getClass().getDeclaredMethod("badGenericTwoArgs",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
@@ -598,13 +598,13 @@ public class DoFnReflectorTest {
}
@SuppressWarnings("unused")
- private void badGenericWildCards(DoFnWithContext<Integer, String>.ProcessContext c,
+ private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c,
WindowingInternals<Integer, ? super Integer> i1) {}
@Test
public void testBadGenericWildCards() throws Exception {
Method method = getClass().getDeclaredMethod("badGenericWildCards",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
@@ -617,13 +617,13 @@ public class DoFnReflectorTest {
}
@SuppressWarnings("unused")
- private <InputT, OutputT> void badTypeVariables(DoFnWithContext<InputT, OutputT>.ProcessContext c,
+ private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
WindowingInternals<InputT, InputT> i1) {}
@Test
public void testBadTypeVariables() throws Exception {
Method method = getClass().getDeclaredMethod("badTypeVariables",
- DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+ DoFn.ProcessContext.class, WindowingInternals.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
@@ -636,7 +636,7 @@ public class DoFnReflectorTest {
@Test
public void testProcessElementException() throws Exception {
- DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+ DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(@SuppressWarnings("unused") ProcessContext c) {
throw new IllegalArgumentException("bogus");
@@ -650,7 +650,7 @@ public class DoFnReflectorTest {
@Test
public void testStartBundleException() throws Exception {
- DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+ DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
@StartBundle
public void startBundle(@SuppressWarnings("unused") Context c) {
throw new IllegalArgumentException("bogus");
@@ -668,7 +668,7 @@ public class DoFnReflectorTest {
@Test
public void testFinishBundleException() throws Exception {
- DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+ DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
@FinishBundle
public void finishBundle(@SuppressWarnings("unused") Context c) {
throw new IllegalArgumentException("bogus");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
new file mode 100644
index 0000000..c7e8972
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/** Tests for {@link DoFn}. */
+@RunWith(JUnit4.class)
+public class DoFnTest implements Serializable {
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ private class NoOpDoFn extends DoFn<Void, Void> {
+
+ /**
+ * @param c context
+ */
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ }
+ }
+
+ @Test
+ public void testCreateAggregatorWithCombinerSucceeds() {
+ String name = "testAggregator";
+ Sum.SumLongFn combiner = new Sum.SumLongFn();
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+ assertEquals(name, aggregator.getName());
+ assertEquals(combiner, aggregator.getCombineFn());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullNameThrowsException() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("name cannot be null");
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ doFn.createAggregator(null, new Sum.SumLongFn());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullCombineFnThrowsException() {
+ CombineFn<Object, Object, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+ SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithSameNameThrowsException() {
+ String name = "testAggregator";
+ CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ doFn.createAggregator(name, combiner);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot create");
+ thrown.expectMessage(name);
+ thrown.expectMessage("already exists");
+
+ doFn.createAggregator(name, combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+ String nameOne = "testAggregator";
+ String nameTwo = "aggregatorPrime";
+ CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+ DoFn<Void, Void> doFn = new NoOpDoFn();
+
+ Aggregator<Double, Double> aggregatorOne =
+ doFn.createAggregator(nameOne, combiner);
+ Aggregator<Double, Double> aggregatorTwo =
+ doFn.createAggregator(nameTwo, combiner);
+
+ assertNotEquals(aggregatorOne, aggregatorTwo);
+ }
+
+ @Test
+ public void testDoFnWithContextUsingAggregators() {
+ NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
+ OldDoFn<Object, Object>.Context context = noOpFn.context();
+
+ OldDoFn<Object, Object> fn = spy(noOpFn);
+ context = spy(context);
+
+ @SuppressWarnings("unchecked")
+ Aggregator<Long, Long> agg = mock(Aggregator.class);
+
+ Sum.SumLongFn combiner = new Sum.SumLongFn();
+ Aggregator<Long, Long> delegateAggregator =
+ fn.createAggregator("test", combiner);
+
+ when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+ context.setupDelegateAggregators();
+ delegateAggregator.addValue(1L);
+
+ verify(agg).addValue(1L);
+ }
+
+ @Test
+ public void testDefaultPopulateDisplayDataImplementation() {
+ DoFn<String, String> fn = new DoFn<String, String>() {
+ };
+ DisplayData displayData = DisplayData.from(fn);
+ assertThat(displayData.items(), empty());
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInStartBundleThrows() {
+ TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+ @StartBundle
+ public void startBundle(Context c) {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {}
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInProcessElementThrows() {
+ TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInFinishBundleThrows() {
+ TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+ @FinishBundle
+ public void finishBundle(Context c) {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {}
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ /**
+ * Initialize a test pipeline with the specified {@link OldDoFn}.
+ */
+ private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
+ TestPipeline pipeline = TestPipeline.create();
+ pipeline.apply(Create.of((InputT) null))
+ .apply(ParDo.of(fn));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
deleted file mode 100644
index 0a910b8..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/** Tests for {@link DoFnWithContext}. */
-@RunWith(JUnit4.class)
-public class DoFnWithContextTest implements Serializable {
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
-
- private class NoOpDoFnWithContext extends DoFnWithContext<Void, Void> {
-
- /**
- * @param c context
- */
- @ProcessElement
- public void processElement(ProcessContext c) {
- }
- }
-
- @Test
- public void testCreateAggregatorWithCombinerSucceeds() {
- String name = "testAggregator";
- Sum.SumLongFn combiner = new Sum.SumLongFn();
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @Test
- public void testCreateAggregatorWithNullNameThrowsException() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("name cannot be null");
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- doFn.createAggregator(null, new Sum.SumLongFn());
- }
-
- @Test
- public void testCreateAggregatorWithNullCombineFnThrowsException() {
- CombineFn<Object, Object, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithNullSerializableFnThrowsException() {
- SerializableFunction<Iterable<Object>, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithSameNameThrowsException() {
- String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Cannot create");
- thrown.expectMessage(name);
- thrown.expectMessage("already exists");
-
- doFn.createAggregator(name, combiner);
- }
-
- @Test
- public void testCreateAggregatorsWithDifferentNamesSucceeds() {
- String nameOne = "testAggregator";
- String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
- DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
- Aggregator<Double, Double> aggregatorOne =
- doFn.createAggregator(nameOne, combiner);
- Aggregator<Double, Double> aggregatorTwo =
- doFn.createAggregator(nameTwo, combiner);
-
- assertNotEquals(aggregatorOne, aggregatorTwo);
- }
-
- @Test
- public void testDoFnWithContextUsingAggregators() {
- NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
- OldDoFn<Object, Object>.Context context = noOpFn.context();
-
- OldDoFn<Object, Object> fn = spy(noOpFn);
- context = spy(context);
-
- @SuppressWarnings("unchecked")
- Aggregator<Long, Long> agg = mock(Aggregator.class);
-
- Sum.SumLongFn combiner = new Sum.SumLongFn();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-
- @Test
- public void testDefaultPopulateDisplayDataImplementation() {
- DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() {
- };
- DisplayData displayData = DisplayData.from(fn);
- assertThat(displayData.items(), empty());
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInStartBundleThrows() {
- TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
- @StartBundle
- public void startBundle(Context c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInProcessElementThrows() {
- TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInFinishBundleThrows() {
- TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
- @FinishBundle
- public void finishBundle(Context c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- /**
- * Initialize a test pipeline with the specified {@link OldDoFn}.
- */
- private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) {
- TestPipeline pipeline = TestPipeline.create();
- pipeline.apply(Create.of((InputT) null))
- .apply(ParDo.of(fn));
-
- return pipeline;
- }
-}