You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/14 21:41:12 UTC
[4/4] beam git commit: Rename DoFn.Context#sideOutput to output
Rename DoFn.Context#sideOutput to output
Having two methods, both named output, one which takes the "main output
type" and one that takes a tag to specify the type more clearly
communicates the actual behavior - sideOutput isn't a "special" way to
output, it's the same as output(T), just to a specified PCollection.
This will help pipeline authors understand the actual behavior of
outputting to a tag, and detangle it from "sideInput", which is a
special way to receive input. Giving them the same name means that it's
not even strange to call output and provide the main output type, which
is what we want - it's a more specific way to output, but does not have
different restrictions or capabilities.
Rename internal references to SideOutput, SideOutputT, etc to (largely)
AdditionalOutput(T).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/113471d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/113471d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/113471d6
Branch: refs/heads/master
Commit: 113471d6457b4afa2523afc74b40be09935292d0
Parents: 89ff0b1
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 17:14:15 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 14 14:40:57 2017 -0700
----------------------------------------------------------------------
.../apex/translation/ParDoTranslator.java | 6 +-
.../operators/ApexGroupByKeyOperator.java | 19 +-
.../operators/ApexParDoOperator.java | 48 +--
.../apex/translation/utils/NoOpStepContext.java | 2 +-
.../apex/translation/ParDoTranslatorTest.java | 18 +-
.../beam/runners/core/BaseExecutionContext.java | 13 +-
.../apache/beam/runners/core/DoFnAdapters.java | 16 +-
.../apache/beam/runners/core/DoFnRunners.java | 8 +-
.../beam/runners/core/ExecutionContext.java | 13 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 6 +-
.../org/apache/beam/runners/core/OldDoFn.java | 38 +--
...eBoundedSplittableProcessElementInvoker.java | 8 +-
.../beam/runners/core/OutputWindowedValue.java | 10 +-
.../beam/runners/core/SimpleDoFnRunner.java | 54 ++--
.../beam/runners/core/SimpleOldDoFnRunner.java | 63 ++--
.../beam/runners/core/SplittableParDo.java | 21 +-
.../beam/runners/core/WindowingInternals.java | 8 +-
.../core/WindowingInternalsAdapters.java | 8 +-
.../core/GroupAlsoByWindowsProperties.java | 10 +-
.../apache/beam/runners/core/NoOpOldDoFn.java | 4 +-
.../apache/beam/runners/core/OldDoFnTest.java | 4 +-
...ndedSplittableProcessElementInvokerTest.java | 6 +-
.../beam/runners/core/ReduceFnTester.java | 8 +-
.../runners/core/SimpleOldDoFnRunnerTest.java | 4 +-
.../beam/runners/core/SplittableParDoTest.java | 8 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 8 +-
.../beam/runners/direct/ParDoEvaluator.java | 4 +-
.../runners/direct/ParDoEvaluatorFactory.java | 10 +-
.../direct/ParDoMultiOverrideFactory.java | 2 +-
...littableProcessElementsEvaluatorFactory.java | 8 +-
.../direct/StatefulParDoEvaluatorFactory.java | 2 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 6 +-
.../FlinkStreamingTransformTranslators.java | 20 +-
.../functions/FlinkDoFnFunction.java | 4 +-
.../functions/FlinkNoOpStepContext.java | 2 +-
.../functions/FlinkStatefulDoFnFunction.java | 4 +-
.../wrappers/streaming/DoFnOperator.java | 14 +-
.../streaming/SplittableDoFnOperator.java | 10 +-
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../flink/streaming/DoFnOperatorTest.java | 34 +--
.../dataflow/BatchStatefulParDoOverrides.java | 2 +-
.../runners/dataflow/BatchViewOverrides.java | 6 +-
.../dataflow/BatchViewOverridesTest.java | 4 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 10 +-
.../spark/translation/MultiDoFnFunction.java | 4 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 8 +-
.../spark/translation/SparkProcessContext.java | 2 +-
.../streaming/StreamingTransformTranslator.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 2 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 14 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 41 ++-
.../org/apache/beam/sdk/transforms/ParDo.java | 77 +++--
.../apache/beam/sdk/transforms/Partition.java | 2 +-
.../beam/sdk/values/PCollectionTuple.java | 3 +-
.../org/apache/beam/sdk/values/TupleTag.java | 26 +-
.../apache/beam/sdk/values/TupleTagList.java | 2 +-
.../org/apache/beam/sdk/values/TypedPValue.java | 4 +-
.../apache/beam/sdk/metrics/MetricsTest.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 293 ++++++++++---------
.../beam/sdk/transforms/SplittableDoFnTest.java | 21 +-
.../beam/sdk/values/PCollectionTupleTest.java | 8 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 46 +--
.../beam/fn/harness/fake/FakeStepContext.java | 2 +-
.../control/ProcessBundleHandlerTest.java | 30 +-
.../sdk/io/gcp/bigquery/WritePartition.java | 6 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +-
66 files changed, 578 insertions(+), 578 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 9213c1f..2e3d902 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -94,7 +94,7 @@ class ParDoTranslator<InputT, OutputT>
context.getPipelineOptions(),
doFn,
transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
+ transform.getAdditionalOutputTags().getAll(),
input.getWindowingStrategy(),
sideInputs,
wvInputCoder,
@@ -114,9 +114,9 @@ class ParDoTranslator<InputT, OutputT>
ports.put(pc, operator.output);
} else {
int portIndex = 0;
- for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
+ for (TupleTag<?> tag : transform.getAdditionalOutputTags().getAll()) {
if (tag.equals(output.getKey())) {
- ports.put(pc, operator.sideOutputPorts[portIndex]);
+ ports.put(pc, operator.additionalOutputPorts[portIndex]);
break;
}
portIndex++;
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 230082e..1697921 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -353,13 +353,14 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
+ throw new UnsupportedOperationException(
+ "GroupAlsoByWindow should not use tagged outputs");
}
@Override
@@ -390,15 +391,13 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- // ignore the side output, this can happen when a user does not register
- // side outputs but then outputs using a freshly created TupleTag.
- throw new RuntimeException("sideOutput() is not available when grouping by window.");
+ public <T> void output(TupleTag<T> tag, T output) {
+ throw new RuntimeException("output() is not available when grouping by window.");
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- sideOutput(tag, output);
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ output(tag, output);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1fc91c8..bad5be2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -88,7 +88,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Bind(JavaSerializer.class)
private final TupleTag<OutputT> mainOutputTag;
@Bind(JavaSerializer.class)
- private final List<TupleTag<?>> sideOutputTags;
+ private final List<TupleTag<?>> additionalOutputTags;
@Bind(JavaSerializer.class)
private final WindowingStrategy<?, ?> windowingStrategy;
@Bind(JavaSerializer.class)
@@ -108,15 +108,15 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
private transient SideInputHandler sideInputHandler;
- private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
- Maps.newHashMapWithExpectedSize(5);
+ private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>>
+ additionalOutputPortMapping = Maps.newHashMapWithExpectedSize(5);
private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
public ApexParDoOperator(
ApexPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<InputT>> inputCoder,
@@ -125,15 +125,15 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.doFn = doFn;
this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
+ this.additionalOutputTags = additionalOutputTags;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.sideInputStateInternals = new StateInternalsProxy<>(
stateBackend.newStateInternalsFactory(VoidCoder.of()));
- if (sideOutputTags.size() > sideOutputPorts.length) {
- String msg = String.format("Too many side outputs (currently only supporting %s).",
- sideOutputPorts.length);
+ if (additionalOutputTags.size() > additionalOutputPorts.length) {
+ String msg = String.format("Too many additional outputs (currently only supporting %s).",
+ additionalOutputPorts.length);
throw new UnsupportedOperationException(msg);
}
@@ -148,7 +148,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this.pipelineOptions = null;
this.doFn = null;
this.mainOutputTag = null;
- this.sideOutputTags = null;
+ this.additionalOutputTags = null;
this.windowingStrategy = null;
this.sideInputs = null;
this.pushedBack = null;
@@ -218,29 +218,31 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> additionalOutput1 =
new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> additionalOutput2 =
new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> additionalOutput3 =
new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> additionalOutput4 =
new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> additionalOutput5 =
new DefaultOutputPort<>();
- public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
- sideOutput3, sideOutput4, sideOutput5};
+ public final transient DefaultOutputPort<?>[] additionalOutputPorts = {
+ additionalOutput1, additionalOutput2, additionalOutput3, additionalOutput4, additionalOutput5
+ };
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
- DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
- if (sideOutputPort != null) {
- sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
+ DefaultOutputPort<ApexStreamTuple<?>> additionalOutputPort =
+ additionalOutputPortMapping.get(tag);
+ if (additionalOutputPort != null) {
+ additionalOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
} else {
output.emit(ApexStreamTuple.DataTuple.of(tuple));
}
@@ -306,11 +308,11 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
sideInputReader = sideInputHandler;
}
- for (int i = 0; i < sideOutputTags.size(); i++) {
+ for (int i = 0; i < additionalOutputTags.size(); i++) {
@SuppressWarnings("unchecked")
DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
- sideOutputPorts[i];
- sideOutputPortMapping.put(sideOutputTags.get(i), port);
+ additionalOutputPorts[i];
+ additionalOutputPortMapping.put(additionalOutputTags.get(i), port);
}
NoOpStepContext stepContext = new NoOpStepContext() {
@@ -332,7 +334,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
sideInputReader,
this,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
new NoOpAggregatorFactory(),
windowingStrategy
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index ad4de97..cc64c7c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -48,7 +48,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 2760d06..1a5c8be 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -267,7 +267,7 @@ public class ParDoTranslatorTest {
List<Integer> inputs = Arrays.asList(3, -42, 666);
final TupleTag<String> mainOutputTag = new TupleTag<>("main");
- final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");
+ final TupleTag<Void> additionalOutputTag = new TupleTag<>("output");
PCollectionView<Integer> sideInput1 = pipeline
.apply("CreateSideInput1", Create.of(11))
@@ -288,10 +288,10 @@ public class ParDoTranslatorTest {
.withSideInputs(sideInput1)
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
- outputs.get(sideOutputTag).setCoder(VoidCoder.of());
+ outputs.get(additionalOutputTag).setCoder(VoidCoder.of());
ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
@@ -312,12 +312,12 @@ public class ParDoTranslatorTest {
private static final long serialVersionUID = 1L;
final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
- final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+ final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
- List<TupleTag<String>> sideOutputTupleTags) {
+ List<TupleTag<String>> additionalOutputTupleTags) {
this.sideInputViews.addAll(sideInputViews);
- this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+ this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
}
@ProcessElement
@@ -334,9 +334,9 @@ public class ParDoTranslatorTest {
value += ": " + sideInputValues;
}
c.output(value);
- for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
- c.sideOutput(sideOutputTupleTag,
- sideOutputTupleTag.getId() + ": " + value);
+ for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
+ c.output(additionalOutputTupleTag,
+ additionalOutputTupleTag.getId() + ": " + value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index 0f23fea..cc7b574 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -106,19 +107,17 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
/**
* Hook for subclasses to implement that will be called whenever
- * {@code DoFn.Context#output}
- * is called.
+ * {@link Context#output(Object)} is called.
*/
@Override
public void noteOutput(WindowedValue<?> output) {}
/**
* Hook for subclasses to implement that will be called whenever
- * {@code DoFn.Context#sideOutput}
- * is called.
+ * {@link Context#output(TupleTag, Object)} is called.
*/
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
/**
* Base class for implementations of {@link ExecutionContext.StepContext}.
@@ -153,8 +152,8 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
- executionContext.noteSideOutput(tag, output);
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ executionContext.noteOutput(tag, output);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index deb3b7e..66ad736 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -162,13 +162,13 @@ public class DoFnAdapters {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
+ public <T> void output(TupleTag<T> tag, T output) {
+ context.output(tag, output);
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.outputWithTimestamp(tag, output, timestamp);
}
@Override
@@ -255,13 +255,13 @@ public class DoFnAdapters {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
+ public <T> void output(TupleTag<T> tag, T output) {
+ context.output(tag, output);
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.outputWithTimestamp(tag, output, timestamp);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index a1b7c8b..b09ee08 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -59,7 +59,7 @@ public class DoFnRunners {
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
@@ -69,7 +69,7 @@ public class DoFnRunners {
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy);
@@ -86,7 +86,7 @@ public class DoFnRunners {
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
@@ -96,7 +96,7 @@ public class DoFnRunners {
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index 40c0798..ecd30c0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
import java.io.IOException;
import java.util.Collection;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -41,17 +42,15 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
- * is called.
+ * {@link Context#output(TupleTag, Object)} is called.
*/
void noteOutput(WindowedValue<?> output);
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
+ * {@link Context#output(TupleTag, Object)} is called.
*/
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+ void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
/**
* Per-step, per-key context used for retrieving state.
@@ -77,10 +76,10 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
* is called.
*/
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+ void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
/**
* Writes the given {@code PCollectionView} data to a globally accessible location.
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 8fff0e4..0cf6e2d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -104,9 +104,9 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index e9d4740..507ee50 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -135,16 +135,15 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
/**
- * Adds the given element to the side output {@code PCollection} with the
+ * Adds the given element to the output {@code PCollection} with the
* given tag.
*
- * <p>Once passed to {@code sideOutput} the element should not be modified
+ * <p>Once passed to {@code output} the element should not be modified
* in any way.
*
* <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags}
- * to specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
+ * to specify the tags of outputs that it consumes. Outputs that are not consumed, e.g., outputs
+ * for monitoring purposes only, don't necessarily need to be specified.
*
* <p>The output element will have the same timestamp and be in the same
* windows as the input element passed to {@link OldDoFn#processElement processElement}.
@@ -159,32 +158,27 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
*
* @see ParDo.SingleOutput#withOutputTags
*/
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+ public abstract <T> void output(TupleTag<T> tag, T output);
/**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
+ * Adds the given element to the specified output {@code PCollection}, with the given timestamp.
*
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
+ * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way.
*
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
+ * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp must not be
+ * older than the input element's timestamp minus {@link OldDoFn#getAllowedTimestampSkew
+ * getAllowedTimestampSkew}. The output element will be in the same windows as the input
+ * element.
*
* <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
+ * this will attempt to use the {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the
+ * input {@code PCollection} to determine what windows the element should be in, throwing an
+ * exception if the {@code WindowFn} attempts to access any information about the input element
+ * except for the timestamp.
*
* @see ParDo.SingleOutput#withOutputTags
*/
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
+ public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
/**
* Creates an {@link Aggregator} in the {@link OldDoFn} context with the
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 27fd0a3..d132af6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -254,13 +254,13 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T value) {
- sideOutputWithTimestamp(tag, value, element.getTimestamp());
+ public <T> void output(TupleTag<T> tag, T value) {
+ outputWithTimestamp(tag, value, element.getTimestamp());
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
- output.sideOutputWindowedValue(
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
+ output.outputWindowedValue(
tag, value, timestamp, element.getWindows(), element.getPane());
noteOutput();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
index 86eeb33..35d6737 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
@@ -25,7 +25,7 @@ import org.joda.time.Instant;
/**
* An object that can output a value with all of its windowing information to the main output or
- * a side output.
+ * any tagged output.
*/
public interface OutputWindowedValue<OutputT> {
/** Outputs a value with windowing information to the main output. */
@@ -35,10 +35,10 @@ public interface OutputWindowedValue<OutputT> {
Collection<? extends BoundedWindow> windows,
PaneInfo pane);
- /** Outputs a value with windowing information to a side output. */
- <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ /** Outputs a value with windowing information to a tagged output. */
+ <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane);
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 98d88b6..141bf20 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -106,7 +106,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
@@ -133,7 +133,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy.getWindowFn());
@@ -257,7 +257,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowFn<?, ?> windowFn) {
@@ -270,8 +270,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
this.outputTags = Sets.newHashSet();
outputTags.add(mainOutputTag);
- for (TupleTag<?> sideOutputTag : sideOutputTags) {
- outputTags.add(sideOutputTag);
+ for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
+ outputTags.add(additionalOutputTag);
}
this.stepContext = stepContext;
@@ -355,16 +355,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
}
- private <T> void sideOutputWindowedValue(
+ private <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+ outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
}
- private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
if (!outputTags.contains(tag)) {
// This tag wasn't declared nor was it seen before during this execution.
// Thus, this must be a new, undeclared and unconsumed output.
@@ -372,18 +372,18 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
// outputs.
if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
throw new IllegalArgumentException(
- "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+ "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
}
outputTags.add(tag);
}
outputManager.output(tag, windowedElem);
if (stepContext != null) {
- stepContext.noteSideOutput(tag, windowedElem);
+ stepContext.noteOutput(tag, windowedElem);
}
}
- // Following implementations of output, outputWithTimestamp, and sideOutput
+ // Following implementations of output, outputWithTimestamp, and output
// are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
// ProcessContext's versions in DoFn.processElement.
@Override
@@ -397,15 +397,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
- sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+ public <T> void output(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "TupleTag passed to output cannot be null");
+ outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
- sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
+ outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
}
@Override
@@ -559,16 +559,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "Tag passed to sideOutput cannot be null");
- context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+ public <T> void output(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "Tag passed to output cannot be null");
+ context.outputWindowedValue(tag, windowedValue.withValue(output));
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(timestamp);
- context.sideOutputWindowedValue(
+ context.outputWindowedValue(
tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
}
@@ -787,14 +787,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutputWindowedValue(
+ public <T> void output(TupleTag<T> tag, T output) {
+ context.outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWindowedValue(
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index c88f1c9..6320a3a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -60,11 +60,16 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
/** The context used for running the {@link OldDoFn}. */
private final DoFnContext<InputT, OutputT> context;
- public SimpleOldDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+ public SimpleOldDoFnRunner(
+ PipelineOptions options,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
- AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
this.fn = fn;
this.context = new DoFnContext<>(
options,
@@ -72,7 +77,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy == null ? null : windowingStrategy.getWindowFn());
@@ -177,7 +182,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowFn<?, ?> windowFn) {
@@ -190,8 +195,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
this.outputTags = Sets.newHashSet();
outputTags.add(mainOutputTag);
- for (TupleTag<?> sideOutputTag : sideOutputTags) {
- outputTags.add(sideOutputTag);
+ for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
+ outputTags.add(additionalOutputTag);
}
this.stepContext = stepContext;
@@ -273,15 +278,15 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
}
- private <T> void sideOutputWindowedValue(TupleTag<T> tag,
+ private <T> void outputWindowedValue(TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+ outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
}
- private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
if (!outputTags.contains(tag)) {
// This tag wasn't declared nor was it seen before during this execution.
// Thus, this must be a new, undeclared and unconsumed output.
@@ -289,18 +294,18 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
// outputs.
if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
throw new IllegalArgumentException(
- "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+ "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
}
outputTags.add(tag);
}
outputManager.output(tag, windowedElem);
if (stepContext != null) {
- stepContext.noteSideOutput(tag, windowedElem);
+ stepContext.noteOutput(tag, windowedElem);
}
}
- // Following implementations of output, outputWithTimestamp, and sideOutput
+ // Following implementations of output, outputWithTimestamp, and output
// are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
// ProcessContext's versions in OldDoFn.processElement.
@Override
@@ -314,15 +319,15 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
- sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+ public <T> void output(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "TupleTag passed to output cannot be null");
+ outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
- sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
+ outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
}
@Override
@@ -428,16 +433,16 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "Tag passed to sideOutput cannot be null");
- context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+ public <T> void output(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "Tag passed to output cannot be null");
+ context.outputWindowedValue(tag, windowedValue.withValue(output));
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(timestamp);
- context.sideOutputWindowedValue(
+ context.outputWindowedValue(
tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
}
@@ -471,13 +476,13 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- context.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+ context.outputWindowedValue(tag, output, timestamp, windows, pane);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index c16bf44..9cc965a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -118,7 +118,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
input.getWindowingStrategy(),
parDo.getSideInputs(),
parDo.getMainOutputTag(),
- parDo.getSideOutputTags()));
+ parDo.getAdditionalOutputTags()));
}
private static <InputT, OutputT, RestrictionT>
@@ -188,14 +188,15 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final WindowingStrategy<?, ?> windowingStrategy;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
- private final TupleTagList sideOutputTags;
+ private final TupleTagList additionalOutputTags;
/**
* @param fn the splittable {@link DoFn}.
* @param windowingStrategy the {@link WindowingStrategy} of the input collection.
* @param sideInputs list of side inputs that should be available to the {@link DoFn}.
* @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
- * @param sideOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} side outputs.
+ * @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
+ * outputs.
*/
public ProcessElements(
DoFn<InputT, OutputT> fn,
@@ -204,14 +205,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
WindowingStrategy<?, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- TupleTagList sideOutputTags) {
+ TupleTagList additionalOutputTags) {
this.fn = fn;
this.elementCoder = elementCoder;
this.restrictionCoder = restrictionCoder;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
+ this.additionalOutputTags = additionalOutputTags;
}
public DoFn<InputT, OutputT> getFn() {
@@ -226,8 +227,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return mainOutputTag;
}
- public TupleTagList getSideOutputTags() {
- return sideOutputTags;
+ public TupleTagList getAdditionalOutputTags() {
+ return additionalOutputTags;
}
public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
@@ -244,7 +245,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
PCollectionTuple outputs =
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
- TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
+ TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
@@ -522,12 +523,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
+ public <T> void output(TupleTag<T> tag, T output) {
throwUnsupportedOutput();
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
throwUnsupportedOutput();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index 8dc0bfc..5005065 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -49,11 +49,11 @@ public interface WindowingInternals<InputT, OutputT> {
Collection<? extends BoundedWindow> windows, PaneInfo pane);
/**
- * Output the value to a side output at the specified timestamp in the listed windows.
+ * Output the value to a tagged output at the specified timestamp in the listed windows.
*/
- <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane);
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
index 48a39d6..1b36bf9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -62,13 +62,13 @@ public class WindowingInternalsAdapters {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- windowingInternals.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+ windowingInternals.outputWindowedValue(tag, output, timestamp, windows, pane);
}
};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 6c7c4e0..d0a8923 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -677,9 +677,9 @@ public class GroupAlsoByWindowsProperties {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
@@ -729,12 +729,12 @@ public class GroupAlsoByWindowsProperties {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
+ public <T> void output(TupleTag<T> tag, T output) {
throw new UnsupportedOperationException();
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
index 5cbea8c..2e5cd6d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -57,10 +57,10 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
public void outputWithTimestamp(OutputT output, Instant timestamp) {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
+ public <T> void output(TupleTag<T> tag, T output) {
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output,
Instant timestamp) {
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index 651bc72..425de07 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -160,12 +160,12 @@ public class OldDoFnTest implements Serializable {
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
+ public <T> void output(TupleTag<T> tag, T output) {
throw new UnsupportedOperationException();
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index 965380b..541e238 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -80,9 +80,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
PaneInfo pane) {}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 512420f..914550e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -574,13 +574,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 28698ca..8ded2dc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -64,10 +64,10 @@ public class SimpleOldDoFnRunnerTest {
private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
// Pass in only necessary parameters for the test
- List<TupleTag<?>> sideOutputTags = Arrays.asList();
+ List<TupleTag<?>> additionalOutputTags = Arrays.asList();
StepContext context = mock(StepContext.class);
return new SimpleOldDoFnRunner<>(
- null, fn, null, null, null, sideOutputTags, context, null, null);
+ null, fn, null, null, null, additionalOutputTags, context, null, null);
}
static class ThrowingDoFn extends OldDoFn<String, String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index d301113..2c89543 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -356,13 +356,13 @@ public class SplittableParDoTest {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- sideOutputWindowedValue(tester.getMainOutputTag(), output, timestamp, windows, pane);
+ outputWindowedValue(tester.getMainOutputTag(), output, timestamp, windows, pane);
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b4ca998..ce7b12a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -264,13 +264,13 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs");
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 328d139..49d0723 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -51,7 +51,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
Map<TupleTag<?>, PCollection<?>> outputs) {
AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
@@ -80,7 +80,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorChanges,
windowingStrategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b8a13e2..0372295 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -80,7 +80,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
doFn,
transform.getSideInputs(),
transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll());
+ transform.getAdditionalOutputTags().getAll());
return evaluator;
}
@@ -103,7 +103,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
DoFn<InputT, OutputT> doFn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags)
+ List<TupleTag<?>> additionalOutputTags)
throws Exception {
String stepName = evaluationContext.getStepName(application);
DirectStepContext stepContext =
@@ -119,7 +119,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
inputBundleKey,
sideInputs,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
fnManager.<InputT, OutputT>get(),
fnManager),
@@ -131,7 +131,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
DirectStepContext stepContext,
DoFn<InputT, OutputT> fn,
DoFnLifecycleManager fnManager)
@@ -147,7 +147,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
key,
sideInputs,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
pcollections(application.getOutputs()));
} catch (Exception e) {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 00c0d6a..366777b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -192,7 +192,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(underlyingParDo.getMainOutputTag())
- .and(underlyingParDo.getSideOutputTags().getAll()),
+ .and(underlyingParDo.getAdditionalOutputTags().getAll()),
input.getWindowingStrategy(),
input.isBounded());
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 07affd8..64cef35 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -105,7 +105,7 @@ class SplittableProcessElementsEvaluatorFactory<
inputBundle.getKey(),
transform.getSideInputs(),
transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
+ transform.getAdditionalOutputTags().getAll(),
stepContext,
processFn,
fnManager);
@@ -146,9 +146,9 @@ class SplittableProcessElementsEvaluatorFactory<
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index f8fe3d6..be77ea1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -120,7 +120,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
doFn,
application.getTransform().getUnderlyingParDo().getSideInputs(),
application.getTransform().getUnderlyingParDo().getMainOutputTag(),
- application.getTransform().getUnderlyingParDo().getSideOutputTags().getAll());
+ application.getTransform().getUnderlyingParDo().getAdditionalOutputTags().getAll());
return new StatefulParDoEvaluator<>(delegateEvaluator);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2a94d48..65a1248 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -70,7 +70,7 @@ public class ParDoEvaluatorTest {
@Mock private EvaluationContext evaluationContext;
private PCollection<Integer> inputPc;
private TupleTag<Integer> mainOutputTag;
- private List<TupleTag<?>> sideOutputTags;
+ private List<TupleTag<?>> additionalOutputTags;
private BundleFactory bundleFactory;
@Rule
@@ -81,7 +81,7 @@ public class ParDoEvaluatorTest {
MockitoAnnotations.initMocks(this);
inputPc = p.apply(Create.of(1, 2, 3));
mainOutputTag = new TupleTag<Integer>() {};
- sideOutputTags = TupleTagList.empty().getAll();
+ additionalOutputTags = TupleTagList.empty().getAll();
bundleFactory = ImmutableListBundleFactory.create();
}
@@ -168,7 +168,7 @@ public class ParDoEvaluatorTest {
null /* key */,
ImmutableList.<PCollectionView<?>>of(singletonView),
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index af157f0..fbd7620 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -406,7 +406,7 @@ class FlinkStreamingTransformTranslators {
DoFn<InputT, OutputT> doFn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, Integer> tagsToLabels,
@@ -422,7 +422,7 @@ class FlinkStreamingTransformTranslators {
List<PCollectionView<?>> sideInputs,
Map<TupleTag<?>, PValue> outputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
@@ -460,7 +460,7 @@ class FlinkStreamingTransformTranslators {
doFn,
sideInputs,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
context,
windowingStrategy,
tagsToLabels,
@@ -485,7 +485,7 @@ class FlinkStreamingTransformTranslators {
doFn,
sideInputs,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
context,
windowingStrategy,
tagsToLabels,
@@ -605,7 +605,7 @@ class FlinkStreamingTransformTranslators {
transform.getSideInputs(),
context.getOutputs(transform),
transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
+ transform.getAdditionalOutputTags().getAll(),
context,
new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
@Override
@@ -613,7 +613,7 @@ class FlinkStreamingTransformTranslators {
DoFn<InputT, OutputT> doFn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, Integer> tagsToLabels,
@@ -624,7 +624,7 @@ class FlinkStreamingTransformTranslators {
doFn,
inputCoder,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
windowingStrategy,
transformedSideInputs,
@@ -654,7 +654,7 @@ class FlinkStreamingTransformTranslators {
transform.getSideInputs(),
context.getOutputs(transform),
transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
+ transform.getAdditionalOutputTags().getAll(),
context,
new ParDoTranslationHelper.DoFnOperatorFactory<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
@@ -668,7 +668,7 @@ class FlinkStreamingTransformTranslators {
OutputT> doFn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, Integer> tagsToLabels,
@@ -683,7 +683,7 @@ class FlinkStreamingTransformTranslators {
doFn,
inputCoder,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
windowingStrategy,
transformedSideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 9687478..51582af 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -87,7 +87,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
if (outputMap == null) {
outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
} else {
- // it has some sideOutputs
+ // it has some additional outputs
outputManager =
new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
}
@@ -97,7 +97,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of side outputs
+ // see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext(),
new FlinkAggregatorFactory(runtimeContext),
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index d901d8e..847a00a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -47,7 +47,7 @@ public class FlinkNoOpStepContext implements StepContext {
}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 0d8399e..c8193d2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -88,7 +88,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
if (outputMap == null) {
outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
} else {
- // it has some sideOutputs
+ // it has some additional Outputs
outputManager =
new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
}
@@ -114,7 +114,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of side outputs
+ // see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext() {
@Override