You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:25 UTC
[07/51] [abbrv] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 2696020..ed9ec10 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -58,15 +58,15 @@ import java.util.Set;
*/
public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- /** The DoFn being run. */
- public final DoFn<InputT, OutputT> fn;
+ /** The OldDoFn being run. */
+ public final OldDoFn<InputT, OutputT> fn;
- /** The context used for running the DoFn. */
+ /** The context used for running the OldDoFn. */
public final DoFnContext<InputT, OutputT> context;
protected DoFnRunnerBase(
PipelineOptions options,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -145,7 +145,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
+ * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in
* {@link DoFnRunnerBase#processElement}.
*/
protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
@@ -162,17 +162,17 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+ * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
*
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
+ * @param <InputT> the type of the OldDoFn's (main) input elements
+ * @param <OutputT> the type of the OldDoFn's (main) output elements
*/
private static class DoFnContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.Context {
+ extends OldDoFn<InputT, OutputT>.Context {
private static final int MAX_SIDE_OUTPUTS = 1000;
final PipelineOptions options;
- final DoFn<InputT, OutputT> fn;
+ final OldDoFn<InputT, OutputT> fn;
final SideInputReader sideInputReader;
final OutputManager outputManager;
final TupleTag<OutputT> mainOutputTag;
@@ -187,7 +187,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
private Set<TupleTag<?>> outputTags;
public DoFnContext(PipelineOptions options,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -317,8 +317,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
// Following implementations of output, outputWithTimestamp, and sideOutput
- // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
- // ProcessContext's versions in DoFn.processElement.
+ // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
+ // ProcessContext's versions in OldDoFn.processElement.
@Override
public void output(OutputT output) {
outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
@@ -350,9 +350,10 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * Returns a new {@code DoFn.ProcessContext} for the given element.
+ * Returns a new {@code OldDoFn.ProcessContext} for the given element.
*/
- protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
+ protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+ WindowedValue<InputT> elem) {
return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
}
@@ -365,21 +366,21 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
}
/**
- * A concrete implementation of {@code DoFn.ProcessContext} used for
- * running a {@link DoFn} over a single element.
+ * A concrete implementation of {@code OldDoFn.ProcessContext} used for
+ * running a {@link OldDoFn} over a single element.
*
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
+ * @param <InputT> the type of the OldDoFn's (main) input elements
+ * @param <OutputT> the type of the OldDoFn's (main) output elements
*/
static class DoFnProcessContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext {
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
- final DoFn<InputT, OutputT> fn;
+ final OldDoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
final WindowedValue<InputT> windowedValue;
- public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+ public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
DoFnContext<InputT, OutputT> context,
WindowedValue<InputT> windowedValue) {
fn.super();
@@ -426,7 +427,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
public BoundedWindow window() {
if (!(fn instanceof RequiresWindowAccess)) {
throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+ "window() is only available in the context of a OldDoFn marked as"
+ + "RequiresWindowAccess.");
}
return Iterables.getOnlyElement(windows());
}
@@ -484,7 +486,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
throw new IllegalArgumentException(String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the current input (%s) minus the allowed skew (%s). See the "
- + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+ + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
timestamp, windowedValue.getTimestamp(),
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index cb96da2..a9f3cf4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
+
import java.util.List;
/**
@@ -44,13 +45,13 @@ public class DoFnRunners {
}
/**
- * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+ * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
*
- * <p>It invokes {@link DoFn#processElement} for each input.
+ * <p>It invokes {@link OldDoFn#processElement} for each input.
*/
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -71,13 +72,14 @@ public class DoFnRunners {
}
/**
- * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+ * Returns a basic implementation of {@link DoFnRunner} that works for most
+ * {@link OldDoFn OldDoFns}.
*
- * <p>It invokes {@link DoFn#processElement} for each input.
+ * <p>It invokes {@link OldDoFn#processElement} for each input.
*/
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -99,7 +101,7 @@ public class DoFnRunners {
/**
* Returns an implementation of {@link DoFnRunner} that handles late data dropping.
*
- * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+ * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
*/
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
@@ -133,7 +135,7 @@ public class DoFnRunners {
/**
* Returns an implementation of {@link DoFnRunner} that handles late data dropping.
*
- * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+ * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
*/
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
@@ -160,7 +162,7 @@ public class DoFnRunners {
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
PipelineOptions options,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -198,7 +200,7 @@ public class DoFnRunners {
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
PipelineOptions options,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
index b575559..f82e5df 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
/**
- * DoFn that merges windows and groups elements in those windows, optionally
+ * OldDoFn that merges windows and groups elements in those windows, optionally
* combining values.
*
* @param <K> key type
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.values.KV;
*/
@SystemDoFnInternal
public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
- extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+ extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
index d185a24..f872ffc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -52,7 +52,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
@Override
public void processElement(
- DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+ OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
throws Exception {
K key = c.element().getKey();
// Used with Batch, we know that all the data is available for this key. We can't use the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index 8a0152e..f0f9007 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -22,8 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -138,7 +138,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
return input
.apply(
ParDo.of(
- new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+ new OldDoFn<
+ KV<K, Iterable<WindowedValue<V>>>,
+ KV<K, Iterable<WindowedValue<V>>>>() {
@Override
public void processElement(ProcessContext c) {
KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
index 4815162..8b3ba24 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
/**
* A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link DoFn}.
+ * a {@link KeyedWorkItem} input {@link OldDoFn}.
*
* <p>It expands windows before checking data lateness.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
index 812e99a..0c5849e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.util.state.ValueState;
import com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index c879409..1fa0830 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -181,7 +181,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* Store the previously emitted pane (if any) for each window.
*
* <ul>
- * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement},
+ * <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement},
* if any.
* <li>Style style: DIRECT
* <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
index e034638..a0cdb40 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
@@ -19,21 +19,21 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.values.TupleTag;
import java.util.List;
/**
- * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
+ * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
*
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
+ * @param <InputT> the type of the OldDoFn's (main) input elements
+ * @param <OutputT> the type of the OldDoFn's (main) output elements
*/
public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
- protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
+ protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
@@ -44,7 +44,7 @@ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, Ou
@Override
protected void invokeProcessElement(WindowedValue<InputT> elem) {
- final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+ final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
// This can contain user code. Wrap it in case it throws an exception.
try {
fn.processElement(processContext);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
index 985f210..5c17009 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -37,7 +37,6 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import java.io.Serializable;
-
import javax.annotation.Nullable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index dc2413a..8d604cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
import static com.google.common.base.Preconditions.checkArgument;
+
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index e0ff879..feba191 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
index fb74fc6..f0c52b9 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.util;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.values.TupleTag;
@@ -62,7 +62,7 @@ public class SimpleDoFnRunnerTest {
runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
}
- private DoFnRunner<String, String> createRunner(DoFn<String, String> fn) {
+ private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
// Pass in only necessary parameters for the test
List<TupleTag<?>> sideOutputTags = Arrays.asList();
StepContext context = mock(StepContext.class);
@@ -70,7 +70,7 @@ public class SimpleDoFnRunnerTest {
null, fn, null, null, null, sideOutputTags, context, null, null);
}
- static class ThrowingDoFn extends DoFn<String, String> {
+ static class ThrowingDoFn extends OldDoFn<String, String> {
final Exception exceptionToThrow =
new UnsupportedOperationException("Expected exception");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 477da30..e052226 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
@@ -106,7 +106,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals();
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
+ OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
GroupAlsoByWindowViaWindowSetDoFn.create(
windowingStrategy,
new ConstantStateInternalsFactory<K>(stateInternals),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index dcbe3d1..8be12fd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
@@ -42,7 +42,7 @@ import org.joda.time.Instant;
* elements added to the bundle will be encoded by the {@link Coder} of the underlying
* {@link PCollection}.
*
- * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
+ * <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element
* after it is added to an output {@link PCollection}.
*/
class ImmutabilityCheckingBundleFactory implements BundleFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index dd1cf37..6ef0ffe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.DoFnRunner;
import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
@@ -48,7 +48,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
DirectStepContext stepContext,
CommittedBundle<InputT> inputBundle,
AppliedPTransform<PCollection<InputT>, ?, ?> application,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index eda3db4..ce770ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
import org.apache.beam.sdk.values.PCollection;
@@ -38,7 +38,7 @@ import java.util.Map;
* {@link BoundMulti} primitive {@link PTransform}.
*/
class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
- private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+ private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>
fnClones;
public ParDoMultiEvaluatorFactory() {
@@ -46,9 +46,10 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
CacheBuilder.newBuilder()
.build(
new CacheLoader<
- AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+ AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() {
@Override
- public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
+ public ThreadLocal<OldDoFn<?, ?>> load(
+ AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
ThreadLocal threadLocal =
@@ -76,7 +77,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
@SuppressWarnings({"unchecked", "rawtypes"})
- ThreadLocal<DoFn<InT, OuT>> fnLocal =
+ ThreadLocal<OldDoFn<InT, OuT>> fnLocal =
(ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
String stepName = evaluationContext.getStepName(application);
DirectStepContext stepContext =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 044abdc..53af6af 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.values.PCollection;
@@ -38,16 +38,17 @@ import java.util.Collections;
* {@link Bound ParDo.Bound} primitive {@link PTransform}.
*/
class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
- private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+ private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>
fnClones;
public ParDoSingleEvaluatorFactory() {
fnClones =
CacheBuilder.newBuilder()
.build(
- new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+ new CacheLoader<
+ AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() {
@Override
- public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key)
+ public ThreadLocal<OldDoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key)
throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
ThreadLocal threadLocal =
@@ -80,7 +81,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
.getOrCreateStepContext(stepName, stepName);
@SuppressWarnings({"unchecked", "rawtypes"})
- ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
+ ThreadLocal<OldDoFn<InputT, OutputT>> fnLocal =
(ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
try {
ParDoEvaluator<InputT> parDoEvaluator =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index 7fac1e3..d021b43 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import javax.annotation.Nullable;
@@ -38,8 +38,8 @@ public interface TransformEvaluatorFactory {
* Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
*
* <p>Any work that must be done before input elements are processed (such as calling
- * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
- * made available to the caller.
+ * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the
+ * {@link TransformEvaluator} is made available to the caller.
*
* <p>May return null if the application cannot produce an evaluator (for example, it is a
* {@link Read} {@link PTransform} where all evaluators are in-use).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index d6ee6ea..cee4001 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -23,9 +23,9 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.Write.Bound;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
@@ -101,7 +101,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
}
@VisibleForTesting
- static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> {
+ static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> {
@VisibleForTesting
static final int MIN_SHARDS_FOR_LOG = 3;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index 353eef6..529316c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -62,9 +62,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
p.apply("listCreate", Create.of("foo", "bar"))
.apply(
ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c)
+ public void processElement(OldDoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -109,9 +109,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c)
+ public void processElement(OldDoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -140,9 +140,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c)
+ public void processElement(OldDoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -157,9 +157,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
p.apply(Create.of("1", "2", "3"))
.apply(
ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c)
+ public void processElement(OldDoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -182,9 +182,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c)
+ public void processElement(OldDoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 09707bd..29dea32 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -159,7 +159,7 @@ public class DirectRunnerTest implements Serializable {
@Test
public void transformDisplayDataExceptionShouldFail() {
- DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+ OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {}
@@ -211,7 +211,7 @@ public class DirectRunnerTest implements Serializable {
/**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -220,7 +220,7 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
@Override public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
@@ -236,7 +236,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -245,7 +245,7 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
@Override public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
@@ -260,7 +260,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+ * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
@@ -269,7 +269,7 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+ .apply(ParDo.of(new OldDoFn<Integer, byte[]>() {
@Override public void processElement(ProcessContext c) {
byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
c.output(outputArray);
@@ -285,7 +285,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+ * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -295,7 +295,7 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.withCoder(ListCoder.of(VarIntCoder.of())))
- .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+ .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() {
@Override public void processElement(ProcessContext c) {
List<Integer> inputList = c.element();
inputList.set(0, 37);
@@ -310,7 +310,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+ * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
@@ -319,7 +319,7 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
- .apply(ParDo.of(new DoFn<byte[], Integer>() {
+ .apply(ParDo.of(new OldDoFn<byte[], Integer>() {
@Override public void processElement(ProcessContext c) {
byte[] inputArray = c.element();
inputArray[0] = 0xa;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index d40cf93..db934e5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -213,9 +213,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
}
- private static class IdentityDoFn<T> extends DoFn<T, T> {
+ private static class IdentityDoFn<T> extends OldDoFn<T, T> {
@Override
- public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 890e06d..e1be120 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -59,9 +59,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
.apply(
ParDo.of(
- new DoFn<byte[], byte[]>() {
+ new OldDoFn<byte[], byte[]>() {
@Override
- public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
+ public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c)
throws Exception {
c.element()[0] = 'b';
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index aa0d976..9e273ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -180,9 +180,9 @@ public class KeyedPValueTrackingVisitorTest {
}
}
- private static class IdentityFn<K> extends DoFn<K, K> {
+ private static class IdentityFn<K> extends OldDoFn<K, K> {
@Override
- public void processElement(DoFn<K, K>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception {
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 07f478d..3208841 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -169,7 +169,7 @@ public class ParDoEvaluatorTest {
ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
}
- private static class RecorderFn extends DoFn<Integer, Integer> {
+ private static class RecorderFn extends OldDoFn<Integer, Integer> {
private Collection<Integer> processed;
private final PCollectionView<Integer> view;
@@ -179,7 +179,7 @@ public class ParDoEvaluatorTest {
}
@Override
- public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
processed.add(c.element());
c.output(c.element() + c.sideInput(view));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index c0ab4df..19094cb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,7 +80,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.<String, Integer>of(c.element(), c.element().length()));
@@ -170,7 +170,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.<String, Integer>of(c.element(), c.element().length()));
@@ -254,7 +254,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.windowingInternals()
@@ -354,7 +354,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.windowingInternals().stateInternals();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index d778da6..a4fd570 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -73,7 +73,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<Integer> collection =
input.apply(
ParDo.of(
- new DoFn<String, Integer>() {
+ new OldDoFn<String, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().length());
@@ -127,7 +127,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<Integer> collection =
input.apply(
ParDo.of(
- new DoFn<String, Integer>() {
+ new OldDoFn<String, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element().length());
@@ -179,7 +179,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
ParDo.Bound<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.windowingInternals()
@@ -265,7 +265,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
ParDo.Bound<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.windowingInternals().stateInternals();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 7c7005c..22f148a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -38,9 +38,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -76,7 +76,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
-
import javax.annotation.Nullable;
/**
@@ -105,9 +104,9 @@ public class WatermarkManagerTest implements Serializable {
createdInts = p.apply("createdInts", Create.of(1, 2, 3));
filtered = createdInts.apply("filtered", Filter.greaterThan(1));
- filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
+ filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() {
@Override
- public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
c.output(c.element() * 2);
}
}));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 56737a4..716c8ad 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
@@ -230,7 +230,7 @@ public class TFIDF {
// Create a collection of pairs mapping a URI to each
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
+ .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
private static final long serialVersionUID = 0;
@Override
@@ -275,7 +275,7 @@ public class TFIDF {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
private static final long serialVersionUID = 0;
@Override
@@ -316,7 +316,7 @@ public class TFIDF {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
@Override
@@ -339,11 +339,11 @@ public class TFIDF {
// documents in which the word appears divided by the total
// number of documents in the corpus. Note how the total number of
// documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
+ // presented to each invocation of the OldDoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
private static final long serialVersionUID = 0;
@Override
@@ -375,7 +375,7 @@ public class TFIDF {
return wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
@Override
@@ -416,7 +416,7 @@ public class TFIDF {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
private static final long serialVersionUID = 0;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index c54229d..080cdc9 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollection;
public class WordCount {
- public static class ExtractWordsFn extends DoFn<String, String> {
+ public static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index c0ff85d..068404a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
@@ -92,7 +92,7 @@ public class AutoComplete {
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
- new DoFn<KV<String, Long>, CompletionCandidate>() {
+ new OldDoFn<KV<String, Long>, CompletionCandidate>() {
private static final long serialVersionUID = 0;
@Override
@@ -182,7 +182,7 @@ public class AutoComplete {
}
private static class FlattenTops
- extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
private static final long serialVersionUID = 0;
@Override
@@ -236,10 +236,10 @@ public class AutoComplete {
}
/**
- * A DoFn that keys each candidate by all its prefixes.
+ * A OldDoFn that keys each candidate by all its prefixes.
*/
private static class AllPrefixes
- extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private static final long serialVersionUID = 0;
private final int minPrefix;
@@ -314,7 +314,7 @@ public class AutoComplete {
}
}
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -340,8 +340,8 @@ public class AutoComplete {
* Takes as input a the top candidates per prefix, and emits an entity
* suitable for writing to Datastore.
*/
- static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
- implements DoFn.RequiresWindowAccess{
+ static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
+ implements OldDoFn.RequiresWindowAccess{
private static final long serialVersionUID = 0;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index f456b27..7d7c0c7 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,7 +76,7 @@ public class JoinExamples {
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply("Process", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 0;
@Override
@@ -98,7 +98,7 @@ public class JoinExamples {
}));
return finalResultCollection
- .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
private static final long serialVersionUID = 0;
@Override
@@ -110,7 +110,7 @@ public class JoinExamples {
}));
}
- static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+ static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
private static final long serialVersionUID = 0;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 8756abe..395b409 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -326,7 +326,7 @@ public class KafkaIOExamples {
* Print contents to stdout
* @param <T> type of the input
*/
- private static class PrintFn<T> extends DoFn<T, T> {
+ private static class PrintFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 4e81420..8c31783 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,7 +49,7 @@ public class KafkaWindowedWordCountExample {
static final String GROUP_ID = "myGroup"; // Default groupId
static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
- public static class ExtractWordsFn extends DoFn<String, String> {
+ public static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -71,7 +71,7 @@ public class KafkaWindowedWordCountExample {
}
}
- public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
@Override
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 1b532a7..d149e4e 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,7 +59,7 @@ public class WindowedWordCount {
static final long WINDOW_SIZE = 10; // Default window duration in seconds
static final long SLIDE_SIZE = 5; // Default window slide in seconds
- static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
@Override
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
@@ -67,7 +67,7 @@ public class WindowedWordCount {
}
}
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 0bba0d0..01a3ab2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
@@ -391,7 +391,7 @@ class FlinkBatchTransformTranslators {
inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
// construct a map from side input to WindowingStrategy so that
- // the DoFn runner can map main-input windows to side input windows
+ // the OldDoFn runner can map main-input windows to side input windows
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
for (PCollectionView<?> sideInput: transform.getSideInputs()) {
sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -494,7 +494,7 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final DoFn<InputT, OutputT> doFn = transform.getFn();
+ final OldDoFn<InputT, OutputT> doFn = transform.getFn();
TypeInformation<WindowedValue<OutputT>> typeInformation =
context.getTypeInfo(context.getOutput(transform));
@@ -502,7 +502,7 @@ class FlinkBatchTransformTranslators {
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
// construct a map from side input to WindowingStrategy so that
- // the DoFn runner can map main-input windows to side input windows
+ // the OldDoFn runner can map main-input windows to side input windows
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
for (PCollectionView<?> sideInput: sideInputs) {
sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -539,7 +539,7 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final DoFn<InputT, OutputT> doFn = transform.getFn();
+ final OldDoFn<InputT, OutputT> doFn = transform.getFn();
Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
@@ -578,7 +578,7 @@ class FlinkBatchTransformTranslators {
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
// construct a map from side input to WindowingStrategy so that
- // the DoFn runner can map main-input windows to side input windows
+ // the OldDoFn runner can map main-input windows to side input windows
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
for (PCollectionView<?> sideInput: sideInputs) {
sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index fa6b387..5b55d42 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -35,11 +35,10 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
@@ -71,8 +70,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -346,8 +343,8 @@ public class FlinkStreamingTransformTranslators {
context.setOutputDataStream(context.getOutput(transform), windowedStream);
}
- private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
- return new DoFn<T, T>() {
+ private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
+ return new OldDoFn<T, T>() {
@Override
public void processElement(final ProcessContext c) throws Exception {