You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:18 UTC
[10/19] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 89243a3..a4af1b0 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
@@ -30,13 +30,13 @@ import org.apache.flink.util.Collector;
import java.util.Map;
/**
- * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn}
+ * Encapsulates a {@link OldDoFn}
* inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
*/
public class FlinkDoFnFunction<InputT, OutputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
- private final DoFn<InputT, OutputT> doFn;
+ private final OldDoFn<InputT, OutputT> doFn;
private final SerializedPipelineOptions serializedOptions;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -47,7 +47,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
private final WindowingStrategy<?, ?> windowingStrategy;
public FlinkDoFnFunction(
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options) {
@@ -56,7 +56,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
this.serializedOptions = new SerializedPipelineOptions(options);
this.windowingStrategy = windowingStrategy;
- this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+ this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
this.hasSideInputs = !sideInputs.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 9074d72..2d36043 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
@@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction<
private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
- private final DoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
+ private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
private final WindowingStrategy<?, W> windowingStrategy;
@@ -81,8 +81,8 @@ public class FlinkMergingNonShuffleReduceFunction<
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- // dummy DoFn because we need one for ProcessContext
- this.doFn = new DoFn<KV<K, InputT>, KV<K, OutputT>>() {
+ // dummy OldDoFn because we need one for ProcessContext
+ this.doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index f92e76f..6e673fc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -32,7 +32,7 @@ import org.apache.flink.util.Collector;
import java.util.Map;
/**
- * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn} that uses side outputs
+ * Encapsulates a {@link OldDoFn} that uses side outputs
* inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
*
* We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
@@ -42,7 +42,7 @@ import java.util.Map;
public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> {
- private final DoFn<InputT, OutputT> doFn;
+ private final OldDoFn<InputT, OutputT> doFn;
private final SerializedPipelineOptions serializedOptions;
private final Map<TupleTag<?>, Integer> outputMap;
@@ -55,7 +55,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
private final WindowingStrategy<?, ?> windowingStrategy;
public FlinkMultiOutputDoFnFunction(
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options,
@@ -64,7 +64,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
this.serializedOptions = new SerializedPipelineOptions(options);
this.outputMap = outputMap;
- this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+ this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
this.hasSideInputs = !sideInputs.isEmpty();
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index 71b6d27..fab3c85 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -35,7 +35,7 @@ import java.util.Collection;
import java.util.Map;
/**
- * {@link DoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
+ * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
* side outputs.
*/
class FlinkMultiOutputProcessContext<InputT, OutputT>
@@ -50,7 +50,7 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
FlinkMultiOutputProcessContext(
PipelineOptions pipelineOptions,
RuntimeContext runtimeContext,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Collector<WindowedValue<RawUnionValue>> collector,
Map<TupleTag<?>, Integer> outputMap,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index d49821b..98446f9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -17,18 +17,16 @@
*/
package org.apache.beam.runners.flink.translation.functions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
-import java.util.Collection;
-
/**
* {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from
- * {@link org.apache.beam.sdk.transforms.DoFn#startBundle(DoFn.Context)}
- * or {@link DoFn#finishBundle(DoFn.Context)}.
+ * {@link OldDoFn#startBundle(OldDoFn.Context)}
+ * or {@link OldDoFn#finishBundle(OldDoFn.Context)}.
*
* <p>In those cases the {@code WindowFn} is not allowed to access any element information.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index c29e1df..2db4b7b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -58,7 +58,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
- protected final DoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
+ protected final OldDoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
protected final WindowingStrategy<?, W> windowingStrategy;
@@ -77,8 +77,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- // dummy DoFn because we need one for ProcessContext
- this.doFn = new DoFn<KV<K, InputT>, KV<K, AccumT>>() {
+ // dummy OldDoFn because we need one for ProcessContext
+ this.doFn = new OldDoFn<KV<K, InputT>, KV<K, AccumT>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index 235a803..3954d1f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
@@ -48,10 +48,10 @@ import java.util.Iterator;
import java.util.Map;
/**
- * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext} for our Flink Wrappers.
+ * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
*/
class FlinkProcessContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext {
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
private final PipelineOptions pipelineOptions;
private final RuntimeContext runtimeContext;
@@ -67,7 +67,7 @@ class FlinkProcessContext<InputT, OutputT>
FlinkProcessContext(
PipelineOptions pipelineOptions,
RuntimeContext runtimeContext,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Collector<WindowedValue<OutputT>> collector,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
@@ -80,7 +80,7 @@ class FlinkProcessContext<InputT, OutputT>
this.pipelineOptions = pipelineOptions;
this.runtimeContext = runtimeContext;
this.collector = collector;
- this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+ this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -90,7 +90,7 @@ class FlinkProcessContext<InputT, OutputT>
FlinkProcessContext(
PipelineOptions pipelineOptions,
RuntimeContext runtimeContext,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
doFn.super();
@@ -101,7 +101,7 @@ class FlinkProcessContext<InputT, OutputT>
this.pipelineOptions = pipelineOptions;
this.runtimeContext = runtimeContext;
this.collector = null;
- this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+ this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -141,7 +141,7 @@ class FlinkProcessContext<InputT, OutputT>
public BoundedWindow window() {
if (!requiresWindowAccess) {
throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+ "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
}
return Iterables.getOnlyElement(windowedValue.getWindows());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 9cbc6b9..b1729a4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -60,7 +60,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
- protected final DoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
+ protected final OldDoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
protected final WindowingStrategy<?, W> windowingStrategy;
@@ -81,8 +81,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- // dummy DoFn because we need one for ProcessContext
- this.doFn = new DoFn<KV<K, AccumT>, KV<K, OutputT>>() {
+ // dummy OldDoFn because we need one for ProcessContext
+ this.doFn = new OldDoFn<KV<K, AccumT>, KV<K, OutputT>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index e40d6e3..74ec66a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -24,7 +24,7 @@ import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregat
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -52,13 +52,13 @@ import java.util.Collection;
* */
public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
- private final DoFn<IN, OUTDF> doFn;
+ private final OldDoFn<IN, OUTDF> doFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final SerializedPipelineOptions serializedPipelineOptions;
private DoFnProcessContext context;
- public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+ public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUTDF> doFn) {
checkNotNull(options);
checkNotNull(windowingStrategy);
checkNotNull(doFn);
@@ -104,15 +104,15 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
doFn.processElement(this.context);
}
- private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+ private class DoFnProcessContext extends OldDoFn<IN, OUTDF>.ProcessContext {
- private final DoFn<IN, OUTDF> fn;
+ private final OldDoFn<IN, OUTDF> fn;
protected final Collector<WindowedValue<OUTFL>> collector;
private WindowedValue<IN> element;
- private DoFnProcessContext(DoFn<IN, OUTDF> function,
+ private DoFnProcessContext(OldDoFn<IN, OUTDF> function,
Collector<WindowedValue<OUTFL>> outCollector) {
function.super();
super.setupDelegateAggregators();
@@ -137,9 +137,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
@Override
public BoundedWindow window() {
- if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+ if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+ "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
}
Collection<? extends BoundedWindow> windows = this.element.getWindows();
@@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
throw new IllegalArgumentException(String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the current input (%s) minus the allowed skew (%s). See the "
- + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
+ + "OldDoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
timestamp, ref.getTimestamp(),
PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0e977db..103a12b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -112,7 +112,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
private transient CoderRegistry coderRegistry;
- private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+ private OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
private ProcessContext context;
@@ -263,7 +263,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
* a function with that combiner is created, so that elements are combined as they arrive. This is
* done for speed and (in most of the cases) for reduction of the per-window state.
*/
- private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+ private <W extends BoundedWindow> OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
if (this.operator == null) {
StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory();
@@ -272,7 +272,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
// Thus VOUT == Iterable<VIN>
Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
- this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+ this.operator = (OldDoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
(WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
} else {
Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
@@ -446,7 +446,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
private KeyedWorkItem<K, VIN> element;
- public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+ public ProcessContext(OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
FlinkTimerInternals timerInternals) {
function.super();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
index 619b887..0ea0cab 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -40,7 +40,7 @@ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrap
private final TupleTag<?> mainTag;
private final Map<TupleTag<?>, Integer> outputLabels;
- public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+ public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
super(options, windowingStrategy, doFn);
this.mainTag = checkNotNull(mainTag);
this.outputLabels = checkNotNull(tagsToLabels);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
index 4def0c6..6be94b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
@@ -41,7 +41,7 @@ import java.util.Collection;
* */
public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
- public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+ public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn) {
super(options, windowingStrategy, doFn);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
index 9e55002..a0b33f8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimerInternals;
@@ -106,7 +106,7 @@ public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerIntern
}
}
- public void encodeTimerInternals(DoFn.ProcessContext context,
+ public void encodeTimerInternals(OldDoFn.ProcessContext context,
StateCheckpointWriter writer,
KvCoder<K, VIN> kvCoder,
Coder<? extends BoundedWindow> windowCoder) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 61e219c..c24d91d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -114,7 +114,7 @@ public class PipelineOptionsTest {
}
- private static class TestDoFn extends DoFn<Object, Object> {
+ private static class TestDoFn extends OldDoFn<Object, Object> {
@Override
public void processElement(ProcessContext c) throws Exception {
@@ -126,7 +126,7 @@ public class PipelineOptionsTest {
}
private static class TestParDoWrapper extends FlinkAbstractParDoWrapper {
- public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, DoFn doFn) {
+ public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, OldDoFn doFn) {
super(options, windowingStrategy, doFn);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index bb79b27..ca70096 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -72,7 +72,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
PCollection<String> result = p
.apply(CountingInput.upTo(10))
- .apply(ParDo.of(new DoFn<Long, String>() {
+ .apply(ParDo.of(new OldDoFn<Long, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index fe71802..bc69f34 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.google.common.base.Joiner;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
@@ -59,7 +59,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
p
.apply(CountingInput.upTo(10))
- .apply(ParDo.of(new DoFn<Long, String>() {
+ .apply(ParDo.of(new OldDoFn<Long, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 1b55c61..ca183a8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -61,7 +61,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
}
- public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+ public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> {
private static final long serialVersionUID = 0;
@Override
@@ -97,7 +97,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
- .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+ .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String elem = c.element();
@@ -105,7 +105,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
}
}))
.apply(GroupByKey.<Void, String>create())
- .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+ .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<Void, Iterable<String>> elem = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 1efb42f..7912aee 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -103,7 +103,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
- .apply(ParDo.of(new DoFn<TableRow, String>() {
+ .apply(ParDo.of(new OldDoFn<TableRow, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
@@ -120,7 +120,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
.apply(Count.<String>perElement());
- PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+ PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, Long>, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String, Long> el = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 7fd203f..ac06b52 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -82,7 +82,6 @@ import com.google.api.services.dataflow.model.WorkerPool;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +93,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
import javax.annotation.Nullable;
/**
@@ -1021,7 +1019,7 @@ public class DataflowPipelineTranslator {
}
private static void translateFn(
- DoFn fn,
+ OldDoFn fn,
WindowingStrategy windowingStrategy,
Iterable<PCollectionView<?>> sideInputs,
Coder inputCoder,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e7cc20e..d762d50 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -78,9 +78,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -143,7 +143,6 @@ import com.google.common.collect.Multimap;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -173,7 +172,6 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-
import javax.annotation.Nullable;
/**
@@ -762,13 +760,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
/**
- * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for
+ * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for
* grouping by the hash of the window's byte representation and sorting the grouped values
* using the window's byte representation.
*/
@SystemDoFnInternal
private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
- extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements DoFn.RequiresWindowAccess {
+ extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements
+ OldDoFn.RequiresWindowAccess {
private final IsmRecordCoder<?> ismCoderForHash;
private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) {
@@ -828,15 +827,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PCollection<T>, PCollectionView<T>> {
/**
- * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
+ * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
* <ul>
* <li>Key 1: Window
* <li>Value: Windowed value
* </ul>
*/
static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<T>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
@@ -902,8 +901,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
applyForSingleton(
DataflowRunner runner,
PCollection<T> input,
- DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<FinalT>>> doFn,
+ OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ IsmRecord<WindowedValue<FinalT>>> doFn,
boolean hasDefault,
FinalT defaultValue,
Coder<FinalT> defaultValueCoder) {
@@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static class BatchViewAsList<T>
extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
/**
- * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
+ * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the
* global window. Each {@link IsmRecord} has
* <ul>
* <li>Key 1: Global window</li>
@@ -1008,7 +1007,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SystemDoFnInternal
static class ToIsmRecordForGlobalWindowDoFn<T>
- extends DoFn<T, IsmRecord<WindowedValue<T>>> {
+ extends OldDoFn<T, IsmRecord<WindowedValue<T>>> {
long indexInBundle;
@Override
@@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
+ * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
* to locate the window boundaries. The {@link IsmRecord} has:
* <ul>
* <li>Key 1: Window</li>
@@ -1040,8 +1039,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SystemDoFnInternal
static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<T>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) {
@@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
/**
- * A {@link DoFn} which groups elements by window boundaries. For each group,
+ * A {@link OldDoFn} which groups elements by window boundaries. For each group,
* the group of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>}
* and contains a function {@code WindowedValue<V> -> V}.
@@ -1188,10 +1187,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* </ul>
*/
static class ToMapDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
- IsmRecord<WindowedValue<TransformedMap<K,
- WindowedValue<V>,
- V>>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+ IsmRecord<WindowedValue<TransformedMap<K,
+ WindowedValue<V>,
+ V>>>> {
private final Coder<W> windowCoder;
ToMapDoFn(Coder<W> windowCoder) {
@@ -1358,8 +1357,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@SystemDoFnInternal
private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
- extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
- implements DoFn.RequiresWindowAccess {
+ extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
+ implements OldDoFn.RequiresWindowAccess {
private final IsmRecordCoder<?> coder;
private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) {
@@ -1412,7 +1411,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
+ * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
* and keys to locate window and key boundaries. The main output {@link IsmRecord}s have:
* <ul>
* <li>Key 1: Window</li>
@@ -1424,12 +1423,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet}
* and the unique key count per window to {@code outputForSize}.
*
- * <p>Finally, if this DoFn has been requested to perform unique key checking, it will
+ * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will
* throw an {@link IllegalStateException} if more than one key per window is found.
*/
static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
- IsmRecord<WindowedValue<V>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
+ IsmRecord<WindowedValue<V>>> {
private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet;
@@ -1557,7 +1556,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
+ * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
@@ -1565,11 +1564,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <li>Value: sum of values for window</li>
* </ul>
*
- * <p>This {@link DoFn} is meant to be used to compute the number of unique keys
+ * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<W> windowCoder;
ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
@@ -1606,7 +1605,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
+ * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
@@ -1614,11 +1613,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <li>Value: key</li>
* </ul>
*
- * <p>This {@link DoFn} is meant to be used to output index to key records
+ * <p>This {@link OldDoFn} is meant to be used to output index to key records
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<K> keyCoder;
private final Coder<W> windowCoder;
@@ -1658,7 +1657,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link DoFn} which partitions sets of elements by window boundaries. Within each
+ * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each
* partition, the set of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, Iterable<V>>} is backed by a
* {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
@@ -1673,10 +1672,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* </ul>
*/
static class ToMultimapDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
- IsmRecord<WindowedValue<TransformedMap<K,
- Iterable<WindowedValue<V>>,
- Iterable<V>>>>> {
+ extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+ IsmRecord<WindowedValue<TransformedMap<K,
+ Iterable<WindowedValue<V>>,
+ Iterable<V>>>>> {
private final Coder<W> windowCoder;
ToMultimapDoFn(Coder<W> windowCoder) {
@@ -2335,7 +2334,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
.apply("StripIds", ParDo.of(
- new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
+ new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getValue().getValue());
@@ -2372,11 +2371,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A specialized {@link DoFn} for writing the contents of a {@link PCollection}
+ * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection}
* to a streaming {@link PCollectionView} backend implementation.
*/
private static class StreamingPCollectionViewWriterFn<T>
- extends DoFn<Iterable<T>, T> implements DoFn.RequiresWindowAccess {
+ extends OldDoFn<Iterable<T>, T> implements OldDoFn.RequiresWindowAccess {
private final PCollectionView<?> view;
private final Coder<T> dataCoder;
@@ -2553,7 +2552,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
@Override
public void processElement(ProcessContext c) {
c.output(Arrays.asList(c.element()));
@@ -2716,7 +2715,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Nullable
private PTransform<?, ?> transform;
@Nullable
- private DoFn<?, ?> doFn;
+ private OldDoFn<?, ?> doFn;
/**
* Builds an instance of this class from the overridden transform.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 5f808a5..d4f9a90 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.dataflow.internal;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -63,9 +63,9 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
} else {
// If the windowFn didn't change, we just run a pass-through transform and then set the
// new windowing strategy.
- return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
+ return input.apply("Identity", ParDo.of(new OldDoFn<T, T>() {
@Override
- public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
c.output(c.element());
}
})).setWindowingStrategyInternal(outputStrategy);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index f83acbc..2017313 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -18,32 +18,32 @@
package org.apache.beam.runners.dataflow.util;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import java.io.Serializable;
/**
- * Wrapper class holding the necessary information to serialize a DoFn.
+ * Wrapper class holding the necessary information to serialize a OldDoFn.
*
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
+ * @param <InputT> the type of the (main) input elements of the OldDoFn
+ * @param <OutputT> the type of the (main) output elements of the OldDoFn
*/
public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final DoFn<InputT, OutputT> doFn;
+ private final OldDoFn<InputT, OutputT> doFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Iterable<PCollectionView<?>> sideInputViews;
private final Coder<InputT> inputCoder;
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
+ public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
this.doFn = doFn;
this.windowingStrategy = windowingStrategy;
this.sideInputViews = null;
this.inputCoder = null;
}
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
+ public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
this.doFn = doFn;
this.windowingStrategy = windowingStrategy;
@@ -51,7 +51,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
this.inputCoder = inputCoder;
}
- public DoFn<InputT, OutputT> getDoFn() {
+ public OldDoFn<InputT, OutputT> getDoFn() {
return doFn;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7d89735..2a01c03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -506,7 +506,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
/**
- * Returns a Step for a DoFn by creating and translating a pipeline.
+ * Returns a Step for a OldDoFn by creating and translating a pipeline.
*/
private static Step createPredefinedStep() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
@@ -530,7 +530,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
return step;
}
- private static class NoOpFn extends DoFn<String, String> {
+ private static class NoOpFn extends OldDoFn<String, String> {
@Override public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
@@ -864,7 +864,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
- DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+ OldDoFn<Integer, Integer> fn1 = new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
@@ -880,7 +880,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
};
- DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+ OldDoFn<Integer, Integer> fn2 = new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 4951043..0677030 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -44,7 +44,7 @@ public class WordCount {
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index b5888bd..f4ce516 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -39,7 +39,7 @@ import java.util.Map;
public class DoFnFunction<InputT, OutputT>
implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
WindowedValue<OutputT>> {
- private final DoFn<InputT, OutputT> mFunction;
+ private final OldDoFn<InputT, OutputT> mFunction;
private final SparkRuntimeContext mRuntimeContext;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
@@ -48,7 +48,7 @@ public class DoFnFunction<InputT, OutputT>
* @param runtime Runtime to apply function in.
* @param sideInputs Side inputs used in DoFunction.
*/
- public DoFnFunction(DoFn<InputT, OutputT> fn,
+ public DoFnFunction(OldDoFn<InputT, OutputT> fn,
SparkRuntimeContext runtime,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
this.mFunction = fn;
@@ -69,7 +69,7 @@ public class DoFnFunction<InputT, OutputT>
private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
- ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+ ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
BroadcastHelper<?>> sideInputs) {
super(fn, runtimeContext, sideInputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index daa767d..e33578d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -45,13 +45,13 @@ import scala.Tuple2;
*/
class MultiDoFnFunction<InputT, OutputT>
implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
- private final DoFn<InputT, OutputT> mFunction;
+ private final OldDoFn<InputT, OutputT> mFunction;
private final SparkRuntimeContext mRuntimeContext;
private final TupleTag<OutputT> mMainOutputTag;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
MultiDoFnFunction(
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SparkRuntimeContext runtimeContext,
TupleTag<OutputT> mainOutputTag,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
@@ -75,7 +75,7 @@ class MultiDoFnFunction<InputT, OutputT>
private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
- ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+ ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
BroadcastHelper<?>> sideInputs) {
super(fn, runtimeContext, sideInputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index cad2a8e..58ac03c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
@@ -50,17 +50,17 @@ import java.util.Map;
* Spark runner process context.
*/
public abstract class SparkProcessContext<InputT, OutputT, ValueT>
- extends DoFn<InputT, OutputT>.ProcessContext {
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
- private final DoFn<InputT, OutputT> fn;
+ private final OldDoFn<InputT, OutputT> fn;
private final SparkRuntimeContext mRuntimeContext;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
protected WindowedValue<InputT> windowedValue;
- SparkProcessContext(DoFn<InputT, OutputT> fn,
+ SparkProcessContext(OldDoFn<InputT, OutputT> fn,
SparkRuntimeContext runtime,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
fn.super();
@@ -135,9 +135,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
@Override
public BoundedWindow window() {
- if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+ if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+ "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
}
return Iterables.getOnlyElement(windowedValue.getWindows());
}
@@ -200,7 +200,7 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
protected abstract Iterator<ValueT> getOutputIterator();
protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter,
- final DoFn<InputT, OutputT> doFn) {
+ final OldDoFn<InputT, OutputT> doFn) {
return new Iterable<ValueT>() {
@Override
public Iterator<ValueT> iterator() {
@@ -212,11 +212,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
private class ProcCtxtIterator extends AbstractIterator<ValueT> {
private final Iterator<WindowedValue<InputT>> inputIterator;
- private final DoFn<InputT, OutputT> doFn;
+ private final OldDoFn<InputT, OutputT> doFn;
private Iterator<ValueT> outputIterator;
private boolean calledFinish;
- ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) {
+ ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, OldDoFn<InputT, OutputT> doFn) {
this.inputIterator = iterator;
this.doFn = doFn;
this.outputIterator = getOutputIterator();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c5d5802..c51a500 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -39,8 +39,8 @@ import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -94,6 +94,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+
import scala.Tuple2;
/**
@@ -203,7 +204,7 @@ public final class TransformTranslator {
WindowingStrategy<?, W> windowingStrategy =
(WindowingStrategy<?, W>) transform.getWindowingStrategy();
- DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
+ OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
windowingStrategy,
new InMemoryStateInternalsFactory<K>(),
@@ -768,7 +769,7 @@ public final class TransformTranslator {
&& windowFn instanceof GlobalWindows)) {
context.setOutputRDD(transform, inRDD);
} else {
- DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+ OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
DoFnFunction<T, T> dofn =
new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 8154cd7..b0fb931 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -315,7 +315,7 @@ public final class StreamingTransformTranslator {
sec.setStream(transform, dStream.window(windowDuration, slideDuration));
}
//--- then we apply windowing to the elements
- DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+ OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
((StreamingEvaluationContext) context).getRuntimeContext(), null);
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index d1f8d12..e4a293f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,7 +101,7 @@ public class TfIdfTest {
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
.apply("SplitWords", ParDo.of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
+ new OldDoFn<KV<URI, String>, KV<URI, String>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
@@ -144,7 +144,7 @@ public class TfIdfTest {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey().getKey();
@@ -183,7 +183,7 @@ public class TfIdfTest {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
@@ -208,7 +208,7 @@ public class TfIdfTest {
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
@Override
public void processElement(ProcessContext c) {
String word = c.element().getKey();
@@ -237,7 +237,7 @@ public class TfIdfTest {
// divided by the log of the document frequency.
return wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
String word = c.element().getKey();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 600217d..2e477e9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -70,7 +70,7 @@ public class CombinePerKeyTest {
private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
@Override
public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
- PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
+ PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() {
@Override
public void processElement(ProcessContext processContext) throws Exception {
processContext.output(KV.of(processContext.element(), 1L));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 0f60271..263ce99 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -46,7 +46,7 @@ public class DoFnOutputTest implements Serializable {
PCollection<String> strings = pipeline.apply(Create.of("a"));
// Test that values written from startBundle() and finishBundle() are written to
// the output
- PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() {
+ PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void startBundle(Context c) throws Exception {
c.output("start");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index ded3eb2..739eec3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.ApproximateUnique;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -103,9 +103,9 @@ public class MultiOutputWordCountTest {
}
/**
- * A DoFn that tokenizes lines of text into individual words.
+ * A OldDoFn that tokenizes lines of text into individual words.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
new Sum.SumIntegerFn());
@@ -170,7 +170,7 @@ public class MultiOutputWordCountTest {
}
}
- private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+ private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());