You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 21:51:12 UTC
[1/2] incubator-beam git commit: Update DoFn javadocs to remove
references to OldDoFn and Dataflow
Repository: incubator-beam
Updated Branches:
refs/heads/master a69a0ea90 -> edcb5eff3
Update DoFn javadocs to remove references to OldDoFn and Dataflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e6230cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e6230cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e6230cc
Branch: refs/heads/master
Commit: 4e6230cc734ab3dba081e04d135a285b73008270
Parents: f7384e1
Author: Scott Wegner <sw...@google.com>
Authored: Wed Aug 17 14:38:36 2016 -0700
Committer: Scott Wegner <sw...@google.com>
Committed: Thu Aug 25 09:04:59 2016 -0700
----------------------------------------------------------------------
.../examples/common/PubsubFileInjector.java | 2 +-
.../apache/beam/sdk/util/DoFnRunnerBase.java | 16 +-
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 2 +-
.../apache/beam/sdk/util/ReduceFnRunner.java | 5 +-
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 4 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../direct/TransformEvaluatorFactory.java | 3 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 7 +-
.../translation/MultiOutputWordCountTest.java | 2 +-
.../spark/translation/SerializationTest.java | 4 +-
.../org/apache/beam/sdk/AggregatorValues.java | 4 +-
.../apache/beam/sdk/transforms/Aggregator.java | 14 +-
.../apache/beam/sdk/transforms/CombineFns.java | 18 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 23 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 62 ++--
.../apache/beam/sdk/transforms/GroupByKey.java | 7 +-
.../apache/beam/sdk/transforms/PTransform.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 306 +++++++++----------
.../beam/sdk/transforms/SimpleFunction.java | 6 +-
.../beam/sdk/transforms/windowing/PaneInfo.java | 10 +-
.../beam/sdk/util/BaseExecutionContext.java | 4 +-
.../sdk/util/ReifyTimestampAndWindowsDoFn.java | 4 +-
.../apache/beam/sdk/util/SerializableUtils.java | 2 +-
.../beam/sdk/util/SystemDoFnInternal.java | 7 +-
.../beam/sdk/util/WindowingInternals.java | 3 +-
.../DoFnDelegatingAggregatorTest.java | 2 +-
.../beam/sdk/transforms/DoFnTesterTest.java | 3 +-
.../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +-
28 files changed, 263 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
index e6a1495..4634159 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
@@ -69,7 +69,7 @@ public class PubsubFileInjector {
}
}
- /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */
+ /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */
public static class Bound extends OldDoFn<String, Void> {
private final String outputTopic;
private final String timestampLabelKey;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 8a0f6bf..04a0978 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -58,10 +58,10 @@ import org.joda.time.format.PeriodFormat;
*/
public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- /** The OldDoFn being run. */
+ /** The {@link OldDoFn} being run. */
public final OldDoFn<InputT, OutputT> fn;
- /** The context used for running the OldDoFn. */
+ /** The context used for running the {@link OldDoFn}. */
public final DoFnContext<InputT, OutputT> context;
protected DoFnRunnerBase(
@@ -164,8 +164,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
/**
* A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
*
- * @param <InputT> the type of the OldDoFn's (main) input elements
- * @param <OutputT> the type of the OldDoFn's (main) output elements
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
private static class DoFnContext<InputT, OutputT>
extends OldDoFn<InputT, OutputT>.Context {
@@ -350,7 +350,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * Returns a new {@code OldDoFn.ProcessContext} for the given element.
+ * Returns a new {@link OldDoFn.ProcessContext} for the given element.
*/
protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
WindowedValue<InputT> elem) {
@@ -366,11 +366,11 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * A concrete implementation of {@code OldDoFn.ProcessContext} used for
+ * A concrete implementation of {@link OldDoFn.ProcessContext} used for
* running a {@link OldDoFn} over a single element.
*
- * @param <InputT> the type of the OldDoFn's (main) input elements
- * @param <OutputT> the type of the OldDoFn's (main) output elements
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
static class DoFnProcessContext<InputT, OutputT>
extends OldDoFn<InputT, OutputT>.ProcessContext {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
index f82e5df..f386dfb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
/**
- * OldDoFn that merges windows and groups elements in those windows, optionally
+ * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
* combining values.
*
* @param <K> key type
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 61e5b21..7c3e4d7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -33,7 +33,6 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -177,8 +176,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* Store the previously emitted pane (if any) for each window.
*
* <ul>
- * <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement},
- * if any.
+ * <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
+ * method, if any.
* <li>Style style: DIRECT
* <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
* Cleared when window is merged away.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
index 6c1cf45..1ebe5a8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.values.TupleTag;
/**
* Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
*
- * @param <InputT> the type of the OldDoFn's (main) input elements
- * @param <OutputT> the type of the OldDoFn's (main) output elements
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index d5c0f0c..71bd8b4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -25,7 +25,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
@@ -40,7 +40,7 @@ import org.joda.time.Instant;
* elements added to the bundle will be encoded by the {@link Coder} of the underlying
* {@link PCollection}.
*
- * <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element
+ * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
* after it is added to an output {@link PCollection}.
*/
class ImmutabilityCheckingBundleFactory implements BundleFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index e9fa06b..ecf2da8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
/**
@@ -37,7 +36,7 @@ public interface TransformEvaluatorFactory {
* Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
*
* <p>Any work that must be done before input elements are processed (such as calling
- * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the
+ * {@code DoFn.StartBundle}) must be done before the
* {@link TransformEvaluator} is made available to the caller.
*
* <p>May return null if the application cannot produce an evaluator (for example, it is a
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 139db9d..949c381 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -24,10 +24,10 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
/**
- * Wrapper class holding the necessary information to serialize a OldDoFn.
+ * Wrapper class holding the necessary information to serialize a {@link OldDoFn}.
*
- * @param <InputT> the type of the (main) input elements of the OldDoFn
- * @param <OutputT> the type of the (main) output elements of the OldDoFn
+ * @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
+ * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
*/
public class DoFnInfo<InputT, OutputT> implements Serializable {
private final OldDoFn<InputT, OutputT> doFn;
@@ -66,3 +66,4 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
return inputCoder;
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 517596a..acfa3df 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -100,7 +100,7 @@ public class MultiOutputWordCountTest {
}
/**
- * A OldDoFn that tokenizes lines of text into individual words.
+ * A {@link DoFn} that tokenizes lines of text into individual words.
*/
static class ExtractWordsFn extends DoFn<String, String> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 0e9121c..22a40cd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -142,7 +142,7 @@ public class SerializationTest {
}
/**
- * A OldDoFn that tokenizes lines of text into individual words.
+ * A {@link DoFn} that tokenizes lines of text into individual words.
*/
static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
@@ -170,7 +170,7 @@ public class SerializationTest {
}
/**
- * A OldDoFn that converts a Word and Count into a printable string.
+ * A {@link DoFn} that converts a Word and Count into a printable string.
*/
private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
@ProcessElement
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
index 6297085..1fd034a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
@@ -21,11 +21,11 @@ import java.util.Collection;
import java.util.Map;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
/**
* A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis.
+ * {@link DoFn} are emitted on a per-{@link DoFn}-application basis.
*
* @param <T> the output type of the aggregator
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index db4ab33..67d399f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.util.ExecutionContext;
* to be combined across all bundles.
*
* <p>Aggregators are created by calling
- * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn},
- * typically from the {@link OldDoFn} constructor. Elements can be added to the
+ * {@link DoFn#createAggregator DoFn.createAggregatorForDoFn},
+ * typically from the {@link DoFn} constructor. Elements can be added to the
* {@code Aggregator} by calling {@link Aggregator#addValue}.
*
* <p>Aggregators are visible in the monitoring UI, when the pipeline is run
@@ -37,14 +37,14 @@ import org.apache.beam.sdk.util.ExecutionContext;
*
* <p>Example:
* <pre> {@code
- * class MyDoFn extends OldDoFn<String, String> {
+ * class MyDoFn extends DoFn<String, String> {
* private Aggregator<Integer, Integer> myAggregator;
*
* public MyDoFn() {
* myAggregator = createAggregatorForDoFn("myAggregator", new Sum.SumIntegerFn());
* }
*
- * @Override
+ * @ProcessElement
* public void processElement(ProcessContext c) {
* myAggregator.addValue(1);
* }
@@ -79,8 +79,8 @@ public interface Aggregator<InputT, OutputT> {
/**
* Create an aggregator with the given {@code name} and {@link CombineFn}.
*
- * <p>This method is called to create an aggregator for a {@link OldDoFn}. It receives the
- * class of the {@link OldDoFn} being executed and the context of the step it is being
+ * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the
+ * class of the {@link DoFn} being executed and the context of the step it is being
* executed in.
*/
<InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
@@ -90,7 +90,7 @@ public interface Aggregator<InputT, OutputT> {
// TODO: Consider the following additional API conveniences:
// - In addition to createAggregatorForDoFn(), consider adding getAggregator() to
- // avoid the need to store the aggregator locally in a OldDoFn, i.e., create
+ // avoid the need to store the aggregator locally in a DoFn, i.e., create
// if not already present.
// - Add a shortcut for the most common aggregator:
// c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 9fa8ded..6f05993 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -67,7 +67,7 @@ public class CombineFns {
* <p>The same {@link TupleTag} cannot be used in a composition multiple times.
*
* <p>Example:
- * <pre>{ @code
+ * <pre><code>
* PCollection<KV<K, Integer>> latencies = ...;
*
* TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
@@ -75,7 +75,7 @@ public class CombineFns {
*
* SimpleFunction<Integer, Integer> identityFn =
* new SimpleFunction<Integer, Integer>() {
- * @Override
+ * {@literal @}Override
* public Integer apply(Integer input) {
* return input;
* }};
@@ -87,8 +87,8 @@ public class CombineFns {
*
* PCollection<T> finalResultCollection = maxAndMean
* .apply(ParDo.of(
- * new OldDoFn<KV<K, CoCombineResult>, T>() {
- * @Override
+ * new DoFn<KV<K, CoCombineResult>, T>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) throws Exception {
* KV<K, CoCombineResult> e = c.element();
* Integer maxLatency = e.getValue().get(maxLatencyTag);
@@ -97,7 +97,7 @@ public class CombineFns {
* c.output(...some T...);
* }
* }));
- * } </pre>
+ * </code></pre>
*/
public static ComposeKeyedCombineFnBuilder composeKeyed() {
return new ComposeKeyedCombineFnBuilder();
@@ -110,7 +110,7 @@ public class CombineFns {
* <p>The same {@link TupleTag} cannot be used in a composition multiple times.
*
* <p>Example:
- * <pre>{ @code
+ * <pre><code>
* PCollection<Integer> globalLatencies = ...;
*
* TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
@@ -130,8 +130,8 @@ public class CombineFns {
*
* PCollection<T> finalResultCollection = maxAndMean
* .apply(ParDo.of(
- * new OldDoFn<CoCombineResult, T>() {
- * @Override
+ * new DoFn<CoCombineResult, T>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) throws Exception {
* CoCombineResult e = c.element();
* Integer maxLatency = e.get(maxLatencyTag);
@@ -140,7 +140,7 @@ public class CombineFns {
* c.output(...some T...);
* }
* }));
- * } </pre>
+ * </code></pre>
*/
public static ComposeCombineFnBuilder compose() {
return new ComposeCombineFnBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9f89826..59c8323 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -63,8 +63,6 @@ import org.joda.time.Instant;
* that satisfies the requirements described there. See the {@link ProcessElement}
* for details.
*
- * <p>This functionality is experimental and likely to change.
- *
* <p>Example usage:
*
* <pre> {@code
@@ -123,7 +121,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* <p>If invoked from {@link ProcessElement}), the timestamp
* must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
+ * {@link DoFn#getAllowedTimestampSkew}. The output element will
* be in the same windows as the input element.
*
* <p>If invoked from {@link StartBundle} or {@link FinishBundle},
@@ -172,7 +170,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* <p>If invoked from {@link ProcessElement}), the timestamp
* must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew}. The output element will
+ * {@link DoFn#getAllowedTimestampSkew}. The output element will
* be in the same windows as the input element.
*
* <p>If invoked from {@link StartBundle} or {@link FinishBundle},
@@ -190,7 +188,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
}
/**
- * Information accessible when running {@link OldDoFn#processElement}.
+ * Information accessible when running a {@link DoFn.ProcessElement} method.
*/
public abstract class ProcessContext extends Context {
@@ -359,9 +357,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* Annotation for the method to use to prepare an instance for processing a batch of elements.
* The method annotated with this must satisfy the following constraints:
* <ul>
- * <li>It must have at least one argument.
+ * <li>It must have exactly one argument.
* <li>Its first (and only) argument must be a {@link DoFn.Context}.
* </ul>
+ *
+ * <p>A simple method declaration would look like:
+ * <code>
+ * public void setup(DoFn.Context c) { .. }
+ * </code>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@@ -414,13 +417,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Returns an {@link Aggregator} with aggregation logic specified by the
* {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+ * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created
* during pipeline construction.
*
* @param name the name of the aggregator
* @param combiner the {@link CombineFn} to use in the aggregator
* @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
+ * this {@link DoFn}
* @throws NullPointerException if the name or combiner is null
* @throws IllegalArgumentException if the given name collides with another
* aggregator in this scope
@@ -447,13 +450,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Returns an {@link Aggregator} with the aggregation logic specified by the
* {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+ * across {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be
* created during pipeline construction.
*
* @param name the name of the aggregator
* @param combiner the {@link SerializableFunction} to use in the aggregator
* @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
+ * this {@link DoFn}
* @throws NullPointerException if the name or combiner is null
* @throws IllegalArgumentException if the given name collides with another
* aggregator in this scope
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 82c1293..6801768 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -50,12 +50,12 @@ import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
- * A harness for unit-testing a {@link OldDoFn}.
+ * A harness for unit-testing a {@link DoFn}.
*
* <p>For example:
*
* <pre> {@code
- * OldDoFn<InputT, OutputT> fn = ...;
+ * DoFn<InputT, OutputT> fn = ...;
*
* DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
*
@@ -72,17 +72,17 @@ import org.joda.time.Instant;
* Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...));
* } </pre>
*
- * @param <InputT> the type of the {@code OldDoFn}'s (main) input elements
- * @param <OutputT> the type of the {@code OldDoFn}'s (main) output elements
+ * @param <InputT> the type of the {@link DoFn}'s (main) input elements
+ * @param <OutputT> the type of the {@link DoFn}'s (main) output elements
*/
public class DoFnTester<InputT, OutputT> {
/**
* Returns a {@code DoFnTester} supporting unit-testing of the given
- * {@link OldDoFn}.
+ * {@link DoFn}.
*/
@SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(fn);
+ public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+ return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn));
}
/**
@@ -90,19 +90,19 @@ public class DoFnTester<InputT, OutputT> {
* {@link OldDoFn}.
*/
@SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT>
- of(DoFn<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(DoFnAdapters.toOldDoFn(fn));
+ public static <InputT, OutputT> DoFnTester<InputT, OutputT>
+ of(OldDoFn<InputT, OutputT> fn) {
+ return new DoFnTester<>(fn);
}
/**
* Registers the tuple of values of the side input {@link PCollectionView}s to
- * pass to the {@link OldDoFn} under test.
+ * pass to the {@link DoFn} under test.
*
* <p>Resets the state of this {@link DoFnTester}.
*
* <p>If this isn't called, {@code DoFnTester} assumes the
- * {@link OldDoFn} takes no side inputs.
+ * {@link DoFn} takes no side inputs.
*/
public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
this.sideInputs = sideInputs;
@@ -110,7 +110,7 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn}
+ * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn}
* under test.
*
* <p>The provided value is the final value of the side input in the specified window, not
@@ -129,7 +129,7 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test.
+ * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
*/
public enum CloningBehavior {
CLONE,
@@ -137,14 +137,14 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test.
+ * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
*/
public void setCloningBehavior(CloningBehavior newValue) {
this.cloningBehavior = newValue;
}
/**
- * Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test.
+ * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test.
*/
public CloningBehavior getCloningBehavior() {
return cloningBehavior;
@@ -166,7 +166,7 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements.
+ * A convenience method for testing {@link DoFn DoFns} with bundles of elements.
* Logic proceeds as follows:
*
* <ol>
@@ -182,9 +182,9 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test.
+ * Calls the {@link DoFn.StartBundle} method on the {@link DoFn} under test.
*
- * <p>If needed, first creates a fresh instance of the OldDoFn under test.
+ * <p>If needed, first creates a fresh instance of the {@link DoFn} under test.
*/
public void startBundle() throws Exception {
resetState();
@@ -210,14 +210,14 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
- * context where {@link OldDoFn.ProcessContext#element} returns the
+ * Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a
+ * context where {@link DoFn.ProcessContext#element} returns the
* given element.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
*
- * @throws IllegalStateException if the {@code OldDoFn} under test has already
+ * @throws IllegalStateException if the {@code DoFn} under test has already
* been finished
*/
public void processElement(InputT element) throws Exception {
@@ -235,12 +235,12 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test.
+ * Calls the {@link DoFn.FinishBundle} method of the {@link DoFn} under test.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
*
- * @throws IllegalStateException if the {@code OldDoFn} under test has already
+ * @throws IllegalStateException if the {@link DoFn} under test has already
* been finished
*/
public void finishBundle() throws Exception {
@@ -674,7 +674,7 @@ public class DoFnTester<InputT, OutputT> {
/////////////////////////////////////////////////////////////////////////////
- /** The possible states of processing a OldDoFn. */
+ /** The possible states of processing a {@link DoFn}. */
enum State {
UNSTARTED,
STARTED,
@@ -683,23 +683,23 @@ public class DoFnTester<InputT, OutputT> {
private final PipelineOptions options = PipelineOptionsFactory.create();
- /** The original OldDoFn under test. */
+ /** The original {@link OldDoFn} under test. */
private final OldDoFn<InputT, OutputT> origFn;
/**
- * Whether to clone the original {@link OldDoFn} or just use it as-is.
+ * Whether to clone the original {@link DoFn} or just use it as-is.
*
- * <p></p>Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be.
+ * <p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
*/
private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
- /** The side input values to provide to the OldDoFn under test. */
+ /** The side input values to provide to the {@link DoFn} under test. */
private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
new HashMap<>();
private Map<String, Object> accumulators;
- /** The output tags used by the OldDoFn under test. */
+ /** The output tags used by the {@link DoFn} under test. */
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
/** The original OldDoFn under test, if started. */
@@ -708,7 +708,7 @@ public class DoFnTester<InputT, OutputT> {
/** The ListOutputManager to examine the outputs. */
private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
- /** The state of processing of the OldDoFn under test. */
+ /** The state of processing of the {@link DoFn} under test. */
private State state;
private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index ed7f411..3a3da65 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -63,18 +63,19 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* {@code Coder} of the values of the input.
*
* <p>Example of use:
- * <pre> {@code
+ * <pre><code>
* PCollection<KV<String, Doc>> urlDocPairs = ...;
* PCollection<KV<String, Iterable<Doc>>> urlToDocs =
* urlDocPairs.apply(GroupByKey.<String, Doc>create());
* PCollection<R> results =
- * urlToDocs.apply(ParDo.of(new OldDoFn<KV<String, Iterable<Doc>>, R>() {
+ * urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
* String url = c.element().getKey();
* Iterable<Doc> docsWithThatUrl = c.element().getValue();
* ... process all docs having that url ...
* }}));
- * } </pre>
+ * </code></pre>
*
* <p>{@code GroupByKey} is a key primitive in data-parallel
* processing, since it is the main way to efficiently bring
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 19abef9..4a58141 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -146,7 +146,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* implementing {@code Serializable}.
*
* <p>{@code PTransform} is marked {@code Serializable} solely
- * because it is common for an anonymous {@code OldDoFn},
+ * because it is common for an anonymous {@link DoFn},
* instance to be created within an
* {@code apply()} method of a composite {@code PTransform}.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 5efbe9f..f9cb557 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -50,13 +49,12 @@ import org.apache.beam.sdk.values.TypedPValue;
* <p>The {@link ParDo} processing style is similar to what happens inside
* the "Mapper" or "Reducer" class of a MapReduce-style algorithm.
*
- * <h2>{@link OldDoFn DoFns}</h2>
+ * <h2>{@link DoFn DoFns}</h2>
*
* <p>The function to use to process each element is specified by a
- * {@link OldDoFn OldDoFn<InputT, OutputT>}, primarily via its
- * {@link OldDoFn#processElement processElement} method. The {@link OldDoFn} may also
- * override the default implementations of {@link OldDoFn#startBundle startBundle}
- * and {@link OldDoFn#finishBundle finishBundle}.
+ * {@link DoFn DoFn<InputT, OutputT>}, primarily via its
+ * {@link DoFn.ProcessElement ProcessElement} method. The {@link DoFn} may also
+ * provide a {@link DoFn.StartBundle StartBundle} and {@link DoFn.FinishBundle finishBundle} method.
*
* <p>Conceptually, when a {@link ParDo} transform is executed, the
* elements of the input {@link PCollection} are first divided up
@@ -66,39 +64,38 @@ import org.apache.beam.sdk.values.TypedPValue;
*
* <ol>
* <li>If required, a fresh instance of the argument {@link DoFn} is created
- * on a worker, and {@link DoFn#setup()} is called on this instance. This may be through
- * deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn} instances for
- * multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an
+ * on a worker, and the {@link DoFn.Setup} method is called on this instance. This may be
+ * through deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn}
+ * instances for multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an
* {@link Exception}) will never be reused.</li>
- * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#startBundle} method is called to
- * initialize it. If this method is not overridden, the call may be optimized
- * away.</li>
- * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#processElement} method
+ * <li>The {@link DoFn DoFn's} {@link DoFn.StartBundle} method, if provided, is called to
+ * initialize it.</li>
+ * <li>The {@link DoFn DoFn's} {@link DoFn.ProcessElement} method
* is called on each of the input elements in the bundle.</li>
- * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#finishBundle} method is called
- * to complete its work. After {@link OldDoFn#finishBundle} is called, the
- * framework will not again invoke {@link OldDoFn#processElement} or
- * {@link OldDoFn#finishBundle}
- * until a new call to {@link OldDoFn#startBundle} has occurred.
- * If this method is not overridden, this call may be optimized away.</li>
- * <li>If any of {@link DoFn#setup}, {@link DoFn#startBundle}, {@link DoFn#processElement} or
- * {@link DoFn#finishBundle} throw an exception, {@link DoFn#teardown} will be called on the
- * {@link DoFn} instance.</li>
- * <li>If a runner will no longer use a {@link DoFn}, {@link DoFn#teardown()} will be called on
- * the discarded instance.</li>
+ * <li>The {@link DoFn DoFn's} {@link DoFn.FinishBundle} method, if provided, is called
+ * to complete its work. After {@link DoFn.FinishBundle} is called, the
+ * framework will not again invoke {@link DoFn.ProcessElement} or
+ * {@link DoFn.FinishBundle}
+ * until a new call to {@link DoFn.StartBundle} has occurred.</li>
+ * <li>If any of {@link DoFn.Setup}, {@link DoFn.StartBundle}, {@link DoFn.ProcessElement} or
+ * {@link DoFn.FinishBundle} methods throw an exception, the {@link DoFn.Teardown} method, if
+ * provided, will be called on the {@link DoFn} instance.</li>
+ * <li>If a runner will no longer use a {@link DoFn}, the {@link DoFn.Teardown} method, if
+ * provided, will be called on the discarded instance.</li>
* </ol>
*
- * Each of the calls to any of the {@link OldDoFn OldDoFn's} processing
+ * Each of the calls to any of the {@link DoFn DoFn's} processing
* methods can produce zero or more output elements. All of the
- * of output elements from all of the {@link OldDoFn} instances
+ * of output elements from all of the {@link DoFn} instances
* are included in the output {@link PCollection}.
*
* <p>For example:
*
- * <pre> {@code
+ * <pre><code>
* PCollection<String> lines = ...;
* PCollection<String> words =
- * lines.apply(ParDo.of(new OldDoFn<String, String>() {
+ * lines.apply(ParDo.of(new DoFn<String, String>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
* String line = c.element();
* for (String word : line.split("[^a-zA-Z']+")) {
@@ -106,13 +103,14 @@ import org.apache.beam.sdk.values.TypedPValue;
* }
* }}));
* PCollection<Integer> wordLengths =
- * words.apply(ParDo.of(new OldDoFn<String, Integer>() {
+ * words.apply(ParDo.of(new DoFn<String, Integer>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
* String word = c.element();
* Integer length = word.length();
* c.output(length);
* }}));
- * } </pre>
+ * </code></pre>
*
* <p>Each output element has the same timestamp and is in the same windows
* as its corresponding input element, and the output {@code PCollection}
@@ -131,9 +129,9 @@ import org.apache.beam.sdk.values.TypedPValue;
*
* <pre> {@code
* PCollection<String> words =
- * lines.apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { ... }));
+ * lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... }));
* PCollection<Integer> wordLengths =
- * words.apply("ComputeWordLengths", ParDo.of(new OldDoFn<String, Integer>() { ... }));
+ * words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... }));
* } </pre>
*
* <h2>Side Inputs</h2>
@@ -145,17 +143,18 @@ import org.apache.beam.sdk.values.TypedPValue;
* {@link PCollection PCollections} computed by earlier pipeline operations,
* passed in to the {@link ParDo} transform using
* {@link #withSideInputs}, and their contents accessible to each of
- * the {@link OldDoFn} operations via {@link OldDoFn.ProcessContext#sideInput sideInput}.
+ * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}.
* For example:
*
- * <pre> {@code
+ * <pre><code>
* PCollection<String> words = ...;
* PCollection<Integer> maxWordLengthCutOff = ...; // Singleton PCollection
* final PCollectionView<Integer> maxWordLengthCutOffView =
* maxWordLengthCutOff.apply(View.<Integer>asSingleton());
* PCollection<String> wordsBelowCutOff =
* words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
- * .of(new OldDoFn<String, String>() {
+ * .of(new DoFn<String, String>() {
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
* String word = c.element();
* int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
@@ -163,7 +162,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* c.output(word);
* }
* }}));
- * } </pre>
+ * </code></pre>
*
* <h2>Side Outputs</h2>
*
@@ -174,13 +173,13 @@ import org.apache.beam.sdk.values.TypedPValue;
* and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags}
* to be used for the output {@link PCollectionTuple} are specified by
* invoking {@link #withOutputTags}. Unconsumed side outputs do not
- * necessarily need to be explicitly specified, even if the {@link OldDoFn}
- * generates them. Within the {@link OldDoFn}, an element is added to the
+ * necessarily need to be explicitly specified, even if the {@link DoFn}
+ * generates them. Within the {@link DoFn}, an element is added to the
* main output {@link PCollection} as normal, using
- * {@link OldDoFn.Context#output}, while an element is added to a side output
- * {@link PCollection} using {@link OldDoFn.Context#sideOutput}. For example:
+ * {@link DoFn.Context#output}, while an element is added to a side output
+ * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example:
*
- * <pre> {@code
+ * <pre><code>
* PCollection<String> words = ...;
* // Select words whose length is below a cut off,
* // plus the lengths of words that are above the cut off.
@@ -201,10 +200,11 @@ import org.apache.beam.sdk.values.TypedPValue;
* .withOutputTags(wordsBelowCutOffTag,
* TupleTagList.of(wordLengthsAboveCutOffTag)
* .and(markedWordsTag))
- * .of(new OldDoFn<String, String>() {
+ * .of(new DoFn<String, String>() {
* // Create a tag for the unconsumed side output.
* final TupleTag<String> specialWordsTag =
* new TupleTag<String>(){};
+ * {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
* String word = c.element();
* if (word.length() <= wordLengthCutOff) {
@@ -230,14 +230,13 @@ import org.apache.beam.sdk.values.TypedPValue;
* results.get(wordLengthsAboveCutOffTag);
* PCollection<String> markedWords =
* results.get(markedWordsTag);
- * } </pre>
+ * </code></pre>
*
* <h2>Properties May Be Specified In Any Order</h2>
*
* <p>Several properties can be specified for a {@link ParDo}
- * {@link PTransform}, including name, side inputs, side output tags,
- * and {@link OldDoFn} to invoke. Only the {@link OldDoFn} is required; the
- * name is encouraged but not required, and side inputs and side
+ * {@link PTransform}, including side inputs, side output tags,
+ * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs and side
* output tags are only specified when they're needed. These
* properties can be specified in any order, as long as they're
* specified before the {@link ParDo} {@link PTransform} is applied.
@@ -250,23 +249,23 @@ import org.apache.beam.sdk.values.TypedPValue;
* {@link ParDo.Bound} nested classes, each of which offer
* property setter instance methods to enable setting additional
* properties. {@link ParDo.Bound} is used for {@link ParDo}
- * transforms whose {@link OldDoFn} is specified and whose input and
+ * transforms whose {@link DoFn} is specified and whose input and
* output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used
* for {@link ParDo} transforms that have not yet had their
- * {@link OldDoFn} specified. Only {@link ParDo.Bound} instances can be
+ * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be
* applied.
*
* <p>Another benefit of this approach is that it reduces the number
* of type parameters that need to be specified manually. In
* particular, the input and output types of the {@link ParDo}
* {@link PTransform} are inferred automatically from the type
- * parameters of the {@link OldDoFn} argument passed to {@link ParDo#of}.
+ * parameters of the {@link DoFn} argument passed to {@link ParDo#of}.
*
* <h2>Output Coders</h2>
*
* <p>By default, the {@link Coder Coder<OutputT>} for the
* elements of the main output {@link PCollection PCollection<OutputT>} is
- * inferred from the concrete type of the {@link OldDoFn OldDoFn<InputT, OutputT>}.
+ * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}.
*
* <p>By default, the {@link Coder Coder<SideOutputT>} for the elements of
* a side output {@link PCollection PCollection<SideOutputT>} is inferred
@@ -286,74 +285,74 @@ import org.apache.beam.sdk.values.TypedPValue;
* This style of {@code TupleTag} instantiation is used in the example of
* multiple side outputs, above.
*
- * <h2>Serializability of {@link OldDoFn DoFns}</h2>
+ * <h2>Serializability of {@link DoFn DoFns}</h2>
*
- * <p>A {@link OldDoFn} passed to a {@link ParDo} transform must be
- * {@link Serializable}. This allows the {@link OldDoFn} instance
+ * <p>A {@link DoFn} passed to a {@link ParDo} transform must be
+ * {@link Serializable}. This allows the {@link DoFn} instance
* created in this "main program" to be sent (in serialized form) to
* remote worker machines and reconstituted for bundles of elements
- * of the input {@link PCollection} being processed. A {@link OldDoFn}
+ * of the input {@link PCollection} being processed. A {@link DoFn}
* can have instance variable state, and non-transient instance
* variable state will be serialized in the main program and then
* deserialized on remote worker machines for some number of bundles
* of elements to process.
*
- * <p>{@link OldDoFn DoFns} expressed as anonymous inner classes can be
+ * <p>{@link DoFn DoFns} expressed as anonymous inner classes can be
* convenient, but due to a quirk in Java's rules for serializability,
* non-static inner or nested classes (including anonymous inner
* classes) automatically capture their enclosing class's instance in
* their serialized state. This can lead to including much more than
- * intended in the serialized state of a {@link OldDoFn}, or even things
+ * intended in the serialized state of a {@link DoFn}, or even things
* that aren't {@link Serializable}.
*
* <p>There are two ways to avoid unintended serialized state in a
- * {@link OldDoFn}:
+ * {@link DoFn}:
*
* <ul>
*
- * <li>Define the {@link OldDoFn} as a named, static class.
+ * <li>Define the {@link DoFn} as a named, static class.
*
- * <li>Define the {@link OldDoFn} as an anonymous inner class inside of
+ * <li>Define the {@link DoFn} as an anonymous inner class inside of
* a static method.
*
* </ul>
*
* <p>Both of these approaches ensure that there is no implicit enclosing
- * instance serialized along with the {@link OldDoFn} instance.
+ * instance serialized along with the {@link DoFn} instance.
*
* <p>Prior to Java 8, any local variables of the enclosing
* method referenced from within an anonymous inner class need to be
- * marked as {@code final}. If defining the {@link OldDoFn} as a named
+ * marked as {@code final}. If defining the {@link DoFn} as a named
* static class, such variables would be passed as explicit
* constructor arguments and stored in explicit instance variables.
*
* <p>There are three main ways to initialize the state of a
- * {@link OldDoFn} instance processing a bundle:
+ * {@link DoFn} instance processing a bundle:
*
* <ul>
*
* <li>Define instance variable state (including implicit instance
* variables holding final variables captured by an anonymous inner
- * class), initialized by the {@link OldDoFn}'s constructor (which is
+ * class), initialized by the {@link DoFn}'s constructor (which is
* implicit for an anonymous inner class). This state will be
- * automatically serialized and then deserialized in the {@code OldDoFn}
+ * automatically serialized and then deserialized in the {@link DoFn}
* instances created for bundles. This method is good for state
- * known when the original {@code OldDoFn} is created in the main
+ * known when the original {@link DoFn} is created in the main
* program, if it's not overly large. This is not suitable for any
- * state which must only be used for a single bundle, as {@link OldDoFn OldDoFn's}
+ * state which must only be used for a single bundle, as {@link DoFn DoFn's}
* may be used to process multiple bundles.
*
* <li>Compute the state as a singleton {@link PCollection} and pass it
- * in as a side input to the {@link OldDoFn}. This is good if the state
+ * in as a side input to the {@link DoFn}. This is good if the state
* needs to be computed by the pipeline, or if the state is very large
* and so is best read from file(s) rather than sent as part of the
- * {@code OldDoFn}'s serialized state.
+ * {@link DoFn DoFn's} serialized state.
*
- * <li>Initialize the state in each {@link OldDoFn} instance, in
- * {@link OldDoFn#startBundle}. This is good if the initialization
+ * <li>Initialize the state in each {@link DoFn} instance, in a
+ * {@link DoFn.StartBundle} method. This is good if the initialization
* doesn't depend on any information known only by the main program or
* computed by earlier pipeline operations, but is the same for all
- * instances of this {@link OldDoFn} for all program executions, say
+ * instances of this {@link DoFn} for all program executions, say
* setting up empty caches or initializing constant data.
*
* </ul>
@@ -363,16 +362,16 @@ import org.apache.beam.sdk.values.TypedPValue;
* <p>{@link ParDo} operations are intended to be able to run in
* parallel across multiple worker machines. This precludes easy
* sharing and updating mutable state across those machines. There is
- * no support in the Google Cloud Dataflow system for communicating
+ * no support in the Beam model for communicating
* and synchronizing updates to shared state across worker machines,
* so programs should not access any mutable static variable state in
- * their {@link OldDoFn}, without understanding that the Java processes
+ * their {@link DoFn}, without understanding that the Java processes
* for the main program and workers will each have its own independent
* copy of such state, and there won't be any automatic copying of
* that state across Java processes. All information should be
- * communicated to {@link OldDoFn} instances via main and side inputs and
+ * communicated to {@link DoFn} instances via main and side inputs and
* serialized state, and all output should be communicated from a
- * {@link OldDoFn} instance via main and side outputs, in the absence of
+ * {@link DoFn} instance via main and side outputs, in the absence of
* external communication mechanisms written by user code.
*
* <h2>Fault Tolerance</h2>
@@ -380,29 +379,28 @@ import org.apache.beam.sdk.values.TypedPValue;
* <p>In a distributed system, things can fail: machines can crash,
* machines can be unable to communicate across the network, etc.
* While individual failures are rare, the larger the job, the greater
- * the chance that something, somewhere, will fail. The Google Cloud
- * Dataflow service strives to mask such failures automatically,
- * principally by retrying failed {@link OldDoFn} bundle. This means
- * that a {@code OldDoFn} instance might process a bundle partially, then
- * crash for some reason, then be rerun (often on a different worker
- * machine) on that same bundle and on the same elements as before.
- * Sometimes two or more {@link OldDoFn} instances will be running on the
+ * the chance that something, somewhere, will fail. Beam runners may strive
+ * to mask such failures by retrying failed {@link DoFn} bundle. This means
+ * that a {@link DoFn} instance might process a bundle partially, then
+ * crash for some reason, then be rerun (often in a new JVM) on that
+ * same bundle and on the same elements as before.
+ * Sometimes two or more {@link DoFn} instances will be running on the
* same bundle simultaneously, with the system taking the results of
* the first instance to complete successfully. Consequently, the
- * code in a {@link OldDoFn} needs to be written such that these
+ * code in a {@link DoFn} needs to be written such that these
* duplicate (sequential or concurrent) executions do not cause
- * problems. If the outputs of a {@link OldDoFn} are a pure function of
+ * problems. If the outputs of a {@link DoFn} are a pure function of
* its inputs, then this requirement is satisfied. However, if a
- * {@link OldDoFn OldDoFn's} execution has external side-effects, such as performing
- * updates to external HTTP services, then the {@link OldDoFn OldDoFn's} code
+ * {@link DoFn DoFn's} execution has external side-effects, such as performing
+ * updates to external HTTP services, then the {@link DoFn DoFn's} code
* needs to take care to ensure that those updates are idempotent and
* that concurrent updates are acceptable. This property can be
* difficult to achieve, so it is advisable to strive to keep
- * {@link OldDoFn DoFns} as pure functions as much as possible.
+ * {@link DoFn DoFns} as pure functions as much as possible.
*
* <h2>Optimization</h2>
*
- * <p>The Google Cloud Dataflow service automatically optimizes a
+ * <p>Beam runners may choose to apply optimizations to a
* pipeline before it is executed. A key optimization, <i>fusion</i>,
* relates to {@link ParDo} operations. If one {@link ParDo} operation produces a
* {@link PCollection} that is then consumed as the main input of another
@@ -419,18 +417,16 @@ import org.apache.beam.sdk.values.TypedPValue;
* written to disk, saving all the I/O and space expense of
* constructing it.
*
- * <p>The Google Cloud Dataflow service applies fusion as much as
- * possible, greatly reducing the cost of executing pipelines. As a
- * result, it is essentially "free" to write {@link ParDo} operations in a
+ * <p>When Beam runners apply fusion optimization, it is essentially "free"
+ * to write {@link ParDo} operations in a
* very modular, composable style, each {@link ParDo} operation doing one
* clear task, and stringing together sequences of {@link ParDo} operations to
* get the desired overall effect. Such programs can be easier to
* understand, easier to unit-test, easier to extend and evolve, and
* easier to reuse in new programs. The predefined library of
- * PTransforms that come with Google Cloud Dataflow makes heavy use of
- * this modular, composable style, trusting to the Google Cloud
- * Dataflow service's optimizer to "flatten out" all the compositions
- * into highly optimized stages.
+ * PTransforms that come with Beam makes heavy use of
+ * this modular, composable style, trusting to the runner to
+ * "flatten out" all the compositions into highly optimized stages.
*
* @see <a href="https://cloud.google.com/dataflow/model/par-do">the web
* documentation for ParDo</a>
@@ -443,15 +439,15 @@ public class ParDo {
*
* <p>Side inputs are {@link PCollectionView PCollectionViews}, whose contents are
* computed during pipeline execution and then made accessible to
- * {@link OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. Each
- * invocation of the {@link OldDoFn} receives the same values for these
+ * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each
+ * invocation of the {@link DoFn} receives the same values for these
* side inputs.
*
* <p>See the discussion of Side Inputs above for more explanation.
*
* <p>The resulting {@link PTransform} is incomplete, and its
* input/output types are not yet bound. Use
- * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to
+ * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
* invoke, which will also bind the input/output types of this
* {@link PTransform}.
*/
@@ -464,13 +460,13 @@ public class ParDo {
*
* <p>Side inputs are {@link PCollectionView}s, whose contents are
* computed during pipeline execution and then made accessible to
- * {@code OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}.
+ * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}.
*
* <p>See the discussion of Side Inputs above for more explanation.
*
* <p>The resulting {@link PTransform} is incomplete, and its
* input/output types are not yet bound. Use
- * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to
+ * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
* invoke, which will also bind the input/output types of this
* {@link PTransform}.
*/
@@ -486,11 +482,11 @@ public class ParDo {
*
* <p>{@link TupleTag TupleTags} are used to name (with its static element
* type {@code T}) each main and side output {@code PCollection<T>}.
- * This {@link PTransform PTransform's} {@link OldDoFn} emits elements to the main
+ * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main
* output {@link PCollection} as normal, using
- * {@link OldDoFn.Context#output}. The {@link OldDoFn} emits elements to
+ * {@link DoFn.Context#output}. The {@link DoFn} emits elements to
* a side output {@code PCollection} using
- * {@link OldDoFn.Context#sideOutput}, passing that side output's tag
+ * {@link DoFn.Context#sideOutput}, passing that side output's tag
* as an argument. The result of invoking this {@link PTransform}
* will be a {@link PCollectionTuple}, and any of the the main and
* side output {@code PCollection}s can be retrieved from it via
@@ -501,7 +497,7 @@ public class ParDo {
*
* <p>The resulting {@link PTransform} is incomplete, and its input
* type is not yet bound. Use {@link ParDo.UnboundMulti#of}
- * to specify the {@link OldDoFn} to invoke, which will also bind the
+ * to specify the {@link DoFn} to invoke, which will also bind the
* input type of this {@link PTransform}.
*/
public static <OutputT> UnboundMulti<OutputT> withOutputTags(
@@ -512,6 +508,20 @@ public class ParDo {
/**
* Creates a {@link ParDo} {@link PTransform} that will invoke the
+ * given {@link DoFn} function.
+ *
+ * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
+ * input being a {@code PCollection<InputT>} and the output a
+ * {@code PCollection<OutputT>}, inferred from the types of the argument
+ * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further
+ * properties can be set on it first.
+ */
+ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+ return of(adapt(fn), fn.getClass());
+ }
+
+ /**
+ * Creates a {@link ParDo} {@link PTransform} that will invoke the
* given {@link OldDoFn} function.
*
* <p>The resulting {@link PTransform PTransform's} types have been bound, with the
@@ -538,28 +548,10 @@ public class ParDo {
}
/**
- * Creates a {@link ParDo} {@link PTransform} that will invoke the
- * given {@link DoFn} function.
- *
- * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
- * input being a {@code PCollection<InputT>} and the output a
- * {@code PCollection<OutputT>}, inferred from the types of the argument
- * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
- * properties can be set on it first.
- *
- * <p>{@link DoFn} is an experimental alternative to
- * {@link OldDoFn} which simplifies accessing the window of the element.
- */
- @Experimental
- public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
- return of(adapt(fn), fn.getClass());
- }
-
- /**
* An incomplete {@link ParDo} transform, with unbound input/output types.
*
* <p>Before being applied, {@link ParDo.Unbound#of} must be
- * invoked to specify the {@link OldDoFn} to invoke, which will also
+ * invoked to specify the {@link DoFn} to invoke, which will also
* bind the input/output types of this {@link PTransform}.
*/
public static class Unbound {
@@ -621,6 +613,18 @@ public class ParDo {
/**
* Returns a new {@link ParDo} {@link PTransform} that's like this
+ * transform but which will invoke the given {@link DoFn}
+ * function, and which has its input and output types bound. Does
+ * not modify this transform. The resulting {@link PTransform} is
+ * sufficiently specified to be applied, but more properties can
+ * still be specified.
+ */
+ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+ return of(adapt(fn), fn.getClass());
+ }
+
+ /**
+ * Returns a new {@link ParDo} {@link PTransform} that's like this
* transform but that will invoke the given {@link OldDoFn}
* function, and that has its input and output types bound. Does
* not modify this transform. The resulting {@link PTransform} is
@@ -638,24 +642,11 @@ public class ParDo {
OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
return new Bound<>(name, sideInputs, fn, fnClass);
}
-
-
- /**
- * Returns a new {@link ParDo} {@link PTransform} that's like this
- * transform but which will invoke the given {@link DoFn}
- * function, and which has its input and output types bound. Does
- * not modify this transform. The resulting {@link PTransform} is
- * sufficiently specified to be applied, but more properties can
- * still be specified.
- */
- public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
- return of(adapt(fn), fn.getClass());
- }
}
/**
* A {@link PTransform} that, when applied to a {@code PCollection<InputT>},
- * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements,
+ * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements,
* with all its outputs collected into an output
* {@code PCollection<OutputT>}.
*
@@ -756,9 +747,9 @@ public class ParDo {
/**
* {@inheritDoc}
*
- * <p>{@link ParDo} registers its internal {@link OldDoFn} as a subcomponent for display data.
- * {@link OldDoFn} implementations can register display data by overriding
- * {@link OldDoFn#populateDisplayData}.
+ * <p>{@link ParDo} registers its internal {@link DoFn} as a subcomponent for display data.
+ * {@link DoFn} implementations can register display data by overriding
+ * {@link DoFn#populateDisplayData}.
*/
@Override
public void populateDisplayData(Builder builder) {
@@ -780,7 +771,7 @@ public class ParDo {
* input type.
*
* <p>Before being applied, {@link ParDo.UnboundMulti#of} must be
- * invoked to specify the {@link OldDoFn} to invoke, which will also
+ * invoked to specify the {@link DoFn} to invoke, which will also
* bind the input type of this {@link PTransform}.
*
* @param <OutputT> the type of the main output {@code PCollection} elements
@@ -836,38 +827,41 @@ public class ParDo {
/**
* Returns a new multi-output {@link ParDo} {@link PTransform}
- * that's like this transform but that will invoke the given
- * {@link OldDoFn} function, and that has its input type bound.
+ * that's like this transform but which will invoke the given
+ * {@link DoFn} function, and which has its input type bound.
* Does not modify this transform. The resulting
* {@link PTransform} is sufficiently specified to be applied, but
* more properties can still be specified.
*/
- public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return of(fn, fn.getClass());
- }
-
- public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
- return new BoundMulti<>(
- name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+ public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+ return of(adapt(fn), fn.getClass());
}
/**
* Returns a new multi-output {@link ParDo} {@link PTransform}
- * that's like this transform but which will invoke the given
- * {@link DoFn} function, and which has its input type bound.
+ * that's like this transform but that will invoke the given
+ * {@link OldDoFn} function, and that has its input type bound.
* Does not modify this transform. The resulting
* {@link PTransform} is sufficiently specified to be applied, but
* more properties can still be specified.
+ *
+ * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
*/
- public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
- return of(adapt(fn), fn.getClass());
+ @Deprecated
+ public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
+ return of(fn, fn.getClass());
+ }
+
+ private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
+ return new BoundMulti<>(
+ name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
}
}
/**
* A {@link PTransform} that, when applied to a
* {@code PCollection<InputT>}, invokes a user-specified
- * {@code OldDoFn<InputT, OutputT>} on all its elements, which can emit elements
+ * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements
* to any of the {@link PTransform}'s main and side output
* {@code PCollection}s, which are bundled into a result
* {@code PCollectionTuple}.
@@ -939,7 +933,7 @@ public class ParDo {
input.isBounded());
// The fn will likely be an instance of an anonymous subclass
- // such as OldDoFn<Integer, String> { }, thus will have a high-fidelity
+ // such as DoFn<Integer, String> { }, thus will have a high-fidelity
// TypeDescriptor for the output type.
outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index bf075f8..8604659 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -37,7 +37,7 @@ public abstract class SimpleFunction<InputT, OutputT>
/**
* Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code OldDoFn} instance's most-derived
+ * about the input type of this {@link SimpleFunction} instance's most-derived
* class.
*
* <p>See {@link #getOutputTypeDescriptor} for more discussion.
@@ -48,10 +48,10 @@ public abstract class SimpleFunction<InputT, OutputT>
/**
* Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code OldDoFn} instance's
+ * about the output type of this {@link SimpleFunction} instance's
* most-derived class.
*
- * <p>In the normal case of a concrete {@code OldDoFn} subclass with
+ * <p>In the normal case of a concrete {@link SimpleFunction} subclass with
* no generic type parameters of its own (including anonymous inner
* classes), this will be a complete non-generic type, which is good
* for choosing a default output {@code Coder<OutputT>} for the output
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 0c87e22..727a492 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -29,15 +29,15 @@ import java.util.Objects;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.VarInt;
/**
* Provides information about the pane an element belongs to. Every pane is implicitly associated
* with a window. Panes are observable only via the
- * {@link OldDoFn.ProcessContext#pane} method of the context
- * passed to a {@link OldDoFn#processElement} overridden method.
+ * {@link DoFn.ProcessContext#pane} method of the context
+ * passed to a {@link DoFn.ProcessElement} method.
*
* <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
*/
@@ -72,8 +72,8 @@ public final class PaneInfo {
* definitions:
* <ol>
* <li>We'll call a pipeline 'simple' if it does not use
- * {@link OldDoFn.Context#outputWithTimestamp} in
- * any {@code OldDoFn}, and it uses the same
+ * {@link DoFn.Context#outputWithTimestamp} in
+ * any {@link DoFn}, and it uses the same
* {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness}
* argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}).
* <li>We'll call an element 'locally late', from the point of view of a computation on a
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
index dead76e..9ee55ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
@@ -106,7 +106,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
/**
* Hook for subclasses to implement that will be called whenever
- * {@link OldDoFn.Context#output}
+ * {@code DoFn.Context#output}
* is called.
*/
@Override
@@ -114,7 +114,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
/**
* Hook for subclasses to implement that will be called whenever
- * {@link OldDoFn.Context#sideOutput}
+ * {@code DoFn.Context#sideOutput}
* is called.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index 2808ca9..8f3f540 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.values.KV;
/**
- * OldDoFn that makes timestamps and window assignments explicit in the value part of each key/value
- * pair.
+ * {@link OldDoFn} that makes timestamps and window assignments explicit in the value part of each
+ * key/value pair.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
index 354aa5d..6b3218e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
@@ -102,7 +102,7 @@ public class SerializableUtils {
*/
public static CloudObject ensureSerializable(Coder<?> coder) {
// Make sure that Coders are java serializable as well since
- // they are regularly captured within OldDoFn's.
+ // they are regularly captured within DoFn's.
Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder);
CloudObject cloudObject = copy.asCloudObject();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
index e9904b2..004496b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
@@ -22,15 +22,14 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.apache.beam.sdk.transforms.OldDoFn;
/**
- * Annotation to mark {@link OldDoFn DoFns} as an internal component of the Dataflow SDK.
+ * Annotation to mark {@code DoFns} as an internal component of the Beam SDK.
*
* <p>Currently, the only effect of this is to mark any aggregators reported by an annotated
- * {@code OldDoFn} as a system counter (as opposed to a user counter).
+ * {@code DoFn} as a system counter (as opposed to a user counter).
*
- * <p>This is internal to the Dataflow SDK.
+ * <p>This is internal to the Beam SDK.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index 54158d2..016276c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import java.io.IOException;
import java.util.Collection;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.state.StateInternals;
@@ -28,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
- * Interface that may be required by some (internal) {@code OldDoFn}s to implement windowing. It
+ * Interface that may be required by some (internal) {@link DoFn}s to implement windowing. It
* should not be necessary for general user code to interact with this at all.
*
* <p>This interface should be provided by runner implementors to support windowing on their runner.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index 25b909a..c072fd7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -35,7 +35,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
- * Tests for OldDoFn.DelegatingAggregator.
+ * Tests for {@link OldDoFn.DelegatingAggregator}.
*/
@RunWith(JUnit4.class)
public class DoFnDelegatingAggregatorTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 2f1519c..2649be5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -276,7 +276,8 @@ public class DoFnTesterTest {
}
/**
- * A OldDoFn that adds values to an aggregator and converts input to String in processElement.
+ * An {@link OldDoFn} that adds values to an aggregator and converts input to String in
+ * {@link OldDoFn#processElement).
*/
private static class CounterDoFn extends OldDoFn<Long, String> {
Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index c732510..302b66a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -28,7 +28,7 @@ import org.joda.time.Instant;
/**
* A {@link OldDoFn} that does nothing with provided elements. Used for testing
- * methods provided by the OldDoFn abstract class.
+ * methods provided by the {@link OldDoFn} abstract class.
*
* @param <InputT> unused.
* @param <OutputT> unused.
[2/2] incubator-beam git commit: This closes #844
Posted by ke...@apache.org.
This closes #844
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edcb5eff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edcb5eff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edcb5eff
Branch: refs/heads/master
Commit: edcb5eff3f6e7182954631ff0e21303493858958
Parents: a69a0ea 4e6230c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 25 14:50:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 25 14:50:17 2016 -0700
----------------------------------------------------------------------
.../examples/common/PubsubFileInjector.java | 2 +-
.../apache/beam/sdk/util/DoFnRunnerBase.java | 16 +-
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 2 +-
.../apache/beam/sdk/util/ReduceFnRunner.java | 5 +-
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 4 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../direct/TransformEvaluatorFactory.java | 3 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 7 +-
.../translation/MultiOutputWordCountTest.java | 2 +-
.../spark/translation/SerializationTest.java | 4 +-
.../org/apache/beam/sdk/AggregatorValues.java | 4 +-
.../apache/beam/sdk/transforms/Aggregator.java | 14 +-
.../apache/beam/sdk/transforms/CombineFns.java | 18 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 23 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 62 ++--
.../apache/beam/sdk/transforms/GroupByKey.java | 7 +-
.../apache/beam/sdk/transforms/PTransform.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 306 +++++++++----------
.../beam/sdk/transforms/SimpleFunction.java | 6 +-
.../beam/sdk/transforms/windowing/PaneInfo.java | 10 +-
.../beam/sdk/util/BaseExecutionContext.java | 4 +-
.../sdk/util/ReifyTimestampAndWindowsDoFn.java | 4 +-
.../apache/beam/sdk/util/SerializableUtils.java | 2 +-
.../beam/sdk/util/SystemDoFnInternal.java | 7 +-
.../beam/sdk/util/WindowingInternals.java | 3 +-
.../DoFnDelegatingAggregatorTest.java | 2 +-
.../beam/sdk/transforms/DoFnTesterTest.java | 3 +-
.../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +-
28 files changed, 263 insertions(+), 265 deletions(-)
----------------------------------------------------------------------