You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/14 21:41:11 UTC
[3/4] beam git commit: Rename DoFn.Context#sideOutput to output
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 9a66a2f..5496f71 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -97,7 +97,7 @@ import org.joda.time.Instant;
* @param <InputT> the input type of the {@link DoFn}
* @param <FnOutputT> the output type of the {@link DoFn}
* @param <OutputT> the output type of the operator, this can be different from the fn output
- * type when we have side outputs
+ * type when we have additional tagged outputs
*/
public class DoFnOperator<InputT, FnOutputT, OutputT>
extends AbstractStreamOperator<OutputT>
@@ -110,7 +110,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final SerializedPipelineOptions serializedOptions;
protected final TupleTag<FnOutputT> mainOutputTag;
- protected final List<TupleTag<?>> sideOutputTags;
+ protected final List<TupleTag<?>> additionalOutputTags;
protected final Collection<PCollectionView<?>> sideInputs;
protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
@@ -155,7 +155,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
DoFn<InputT, FnOutputT> doFn,
Coder<WindowedValue<InputT>> inputCoder,
TupleTag<FnOutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
@@ -165,7 +165,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
this.doFn = doFn;
this.inputCoder = inputCoder;
this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
+ this.additionalOutputTags = additionalOutputTags;
this.sideInputTagMapping = sideInputTagMapping;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(options);
@@ -275,7 +275,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
sideInputReader,
outputManager,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy);
@@ -619,7 +619,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
return new DoFnRunners.OutputManager() {
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- // with side outputs we can't get around this because we don't
+ // with tagged outputs we can't get around this because we don't
// know our own output type...
@SuppressWarnings("unchecked")
OutputT castValue = (OutputT) value;
@@ -675,7 +675,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
public void noteOutput(WindowedValue<?> output) {}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
@Override
public <T, W extends BoundedWindow> void writePCollectionViewData(
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 0724ac2..1a636c9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -63,7 +63,7 @@ public class SplittableDoFnOperator<
WindowedValue<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
TupleTag<FnOutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
@@ -74,7 +74,7 @@ public class SplittableDoFnOperator<
doFn,
inputCoder,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
outputManagerFactory,
windowingStrategy,
sideInputTagMapping,
@@ -125,9 +125,9 @@ public class SplittableDoFnOperator<
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index b015f66..8bbc6ef 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -57,7 +57,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
TupleTag<KV<K, OutputT>> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
@@ -68,7 +68,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
null,
inputCoder,
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
outputManagerFactory,
windowingStrategy,
sideInputTagMapping,
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index c1fdea3..4c826d1 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -144,19 +144,19 @@ public class DoFnOperatorTest {
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
TupleTag<String> mainOutput = new TupleTag<>("main-output");
- TupleTag<String> sideOutput1 = new TupleTag<>("side-output-1");
- TupleTag<String> sideOutput2 = new TupleTag<>("side-output-2");
+ TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
+ TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
.put(mainOutput, 1)
- .put(sideOutput1, 2)
- .put(sideOutput2, 3)
+ .put(additionalOutput1, 2)
+ .put(additionalOutput2, 3)
.build();
DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
- new MultiOutputDoFn(sideOutput1, sideOutput2),
+ new MultiOutputDoFn(additionalOutput1, additionalOutput2),
windowedValueCoder,
mainOutput,
- ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
+ ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
@@ -176,8 +176,8 @@ public class DoFnOperatorTest {
assertThat(
this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
contains(
- new RawUnionValue(2, WindowedValue.valueInGlobalWindow("side: one")),
- new RawUnionValue(3, WindowedValue.valueInGlobalWindow("side: two")),
+ new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")),
+ new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")),
new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
@@ -542,24 +542,24 @@ public class DoFnOperatorTest {
}
private static class MultiOutputDoFn extends DoFn<String, String> {
- private TupleTag<String> sideOutput1;
- private TupleTag<String> sideOutput2;
+ private TupleTag<String> additionalOutput1;
+ private TupleTag<String> additionalOutput2;
- public MultiOutputDoFn(TupleTag<String> sideOutput1, TupleTag<String> sideOutput2) {
- this.sideOutput1 = sideOutput1;
- this.sideOutput2 = sideOutput2;
+ public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) {
+ this.additionalOutput1 = additionalOutput1;
+ this.additionalOutput2 = additionalOutput2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
if (c.element().equals("one")) {
- c.sideOutput(sideOutput1, "side: one");
+ c.output(additionalOutput1, "extra: one");
} else if (c.element().equals("two")) {
- c.sideOutput(sideOutput2, "side: two");
+ c.output(additionalOutput2, "extra: two");
} else {
c.output("got: " + c.element());
- c.sideOutput(sideOutput1, "got: " + c.element());
- c.sideOutput(sideOutput2, "got: " + c.element());
+ c.output(additionalOutput1, "got: " + c.element());
+ c.output(additionalOutput2, "got: " + c.element());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 3ded079..73f3728 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -175,7 +175,7 @@ public class BatchStatefulParDoOverrides {
ParDo.of(new BatchStatefulDoFn<K, InputT, OutputT>(fn))
.withSideInputs(originalParDo.getSideInputs())
.withOutputTags(
- originalParDo.getMainOutputTag(), originalParDo.getSideOutputTags());
+ originalParDo.getMainOutputTag(), originalParDo.getAdditionalOutputTags());
return input.apply(new GbkBeforeStatefulParDo<K, InputT>()).apply(statefulParDo);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 86bfeb6..ead2712 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -494,7 +494,7 @@ class BatchViewOverrides {
*/
private void outputMetadataRecordForSize(
ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long uniqueKeyCount) {
- c.sideOutput(outputForSize,
+ c.output(outputForSize,
KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
value.getKey().getValue())),
KV.of(value.getKey().getValue(), uniqueKeyCount)));
@@ -503,7 +503,7 @@ class BatchViewOverrides {
/** This outputs records which will be used to construct the entry set. */
private void outputMetadataRecordForEntrySet(
ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) {
- c.sideOutput(outputForEntrySet,
+ c.output(outputForEntrySet,
KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
value.getKey().getValue())),
KV.of(value.getKey().getValue(), value.getKey().getKey())));
@@ -773,7 +773,7 @@ class BatchViewOverrides {
coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder());
// Create the various output tags representing the main output containing the data stream
- // and the side outputs containing the metadata about the size and entry set.
+ // and the additional outputs containing the metadata about the size and entry set.
TupleTag<IsmRecord<WindowedValue<V>>> mainOutputTag = new TupleTag<>();
TupleTag<KV<Integer, KV<W, Long>>> outputForSizeTag = new TupleTag<>();
TupleTag<KV<Integer, KV<W, K>>> outputForEntrySetTag = new TupleTag<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java
index cd12c92..87395e6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java
@@ -280,7 +280,7 @@ public class BatchViewOverridesTest {
// Verify the number of unique keys per window.
assertThat(
- doFnTester.takeSideOutputElements(outputForSizeTag),
+ doFnTester.takeOutputElements(outputForSizeTag),
contains(
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
@@ -294,7 +294,7 @@ public class BatchViewOverridesTest {
// Verify the output for the unique keys.
assertThat(
- doFnTester.takeSideOutputElements(outputForEntrySetTag),
+ doFnTester.takeOutputElements(outputForEntrySetTag),
contains(
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 0e74fa2..029c28a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -412,12 +412,14 @@ public class SparkGroupAlsoByWindowViaWindowSet {
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output, Instant timestamp,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException("Side outputs are not allowed in GroupAlsoByWindow.");
+ throw new UnsupportedOperationException(
+ "Tagged outputs are not allowed in GroupAlsoByWindow.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index a761954..4cd1683 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -43,8 +43,8 @@ import scala.Tuple2;
/**
- * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the
- * underlying data with multiple TupleTags.
+ * DoFunctions ignore outputs that are not the main output. MultiDoFunctions deal with additional
+ * outputs by enriching the underlying data with multiple TupleTags.
*
* @param <InputT> Input type for DoFunction.
* @param <OutputT> Output type for DoFunction.
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index d19c4a9..ccc0fa3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -164,12 +164,12 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
}
@Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs.");
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
}
Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 4f8a1a5..3e8dde5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -113,7 +113,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
public void noteOutput(WindowedValue<?> output) { }
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { }
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { }
@Override
public <T, W extends BoundedWindow> void writePCollectionViewData(
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 65892d2..000eada 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -385,7 +385,7 @@ public final class StreamingTransformTranslator {
JavaDStream<WindowedValue<InputT>> dStream = unboundedDataset.getDStream();
final String stepName = context.getCurrentTransform().getFullName();
- if (transform.getSideOutputTags().size() == 0) {
+ if (transform.getAdditionalOutputTags().size() == 0) {
JavaPairDStream<TupleTag<?>, WindowedValue<?>> all =
dStream.transformToPair(
new Function<
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 8fe4831..58d65d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2191,7 +2191,7 @@ public class Combine {
c.output(kv);
} else {
int nonce = counter++ % spread;
- c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
+ c.output(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
}
}
})
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 74a1348..d3da251 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -153,14 +153,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
/**
- * Adds the given element to the side output {@code PCollection} with the
+ * Adds the given element to the output {@code PCollection} with the
* given tag.
*
- * <p>Once passed to {@code sideOutput} the element should not be modified
+ * <p>Once passed to {@code output} the element should not be modified
* in any way.
*
* <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
+ * specify the tags of outputs that it consumes. Non-consumed
* outputs, e.g., outputs for monitoring purposes only, don't necessarily
* need to be specified.
*
@@ -180,13 +180,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* @see ParDo.SingleOutput#withOutputTags
*/
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+ public abstract <T> void output(TupleTag<T> tag, T output);
/**
- * Adds the given element to the specified side output {@code PCollection},
+ * Adds the given element to the specified output {@code PCollection},
* with the given timestamp.
*
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+ * <p>Once passed to {@code outputWithTimestamp} the element should not be
* modified in any way.
*
* <p>If invoked from {@link ProcessElement}), the timestamp
@@ -207,7 +207,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* @see ParDo.SingleOutput#withOutputTags
*/
- public abstract <T> void sideOutputWithTimestamp(
+ public abstract <T> void outputWithTimestamp(
TupleTag<T> tag, T output, Instant timestamp);
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 88f4035..5446431 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -65,7 +65,7 @@ import org.joda.time.Instant;
*
* // Set arguments shared across all bundles:
* fnTester.setSideInputs(...); // If fn takes side inputs.
- * fnTester.setSideOutputTags(...); // If fn writes to side outputs.
+ * fnTester.setOutputTags(...); // If fn writes to more than one output.
*
* // Process a bundle containing a single input element:
* Input testInput = ...;
@@ -464,14 +464,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
/**
- * Returns the elements output so far to the side output with the
+ * Returns the elements output so far to the output with the
* given tag. Does not clear them, so subsequent calls will
* continue to include these elements.
*
- * @see #takeSideOutputElements
- * @see #clearSideOutputElements
+ * @see #takeOutputElements
+ * @see #clearOutputElements
*/
- public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
+ public <T> List<T> peekOutputElements(TupleTag<T> tag) {
// TODO: Should we return an unmodifiable list?
return Lists.transform(getImmutableOutput(tag),
new Function<ValueInSingleWindow<T>, T>() {
@@ -483,24 +483,23 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
/**
- * Clears the record of the elements output so far to the side
- * output with the given tag.
+ * Clears the record of the elements output so far to the output with the given tag.
*
- * @see #peekSideOutputElements
+ * @see #peekOutputElements
*/
- public <T> void clearSideOutputElements(TupleTag<T> tag) {
+ public <T> void clearOutputElements(TupleTag<T> tag) {
getMutableOutput(tag).clear();
}
/**
- * Returns the elements output so far to the side output with the given tag.
+ * Returns the elements output so far to the output with the given tag.
* Clears the list so these elements don't appear in future calls.
*
- * @see #peekSideOutputElements
+ * @see #peekOutputElements
*/
- public <T> List<T> takeSideOutputElements(TupleTag<T> tag) {
- List<T> resultElems = new ArrayList<>(peekSideOutputElements(tag));
- clearSideOutputElements(tag);
+ public <T> List<T> takeOutputElements(TupleTag<T> tag) {
+ List<T> resultElems = new ArrayList<>(peekOutputElements(tag));
+ clearOutputElements(tag);
return resultElems;
}
@@ -563,12 +562,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
throwUnsupportedOutputFromBundleMethods();
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
+ public <T> void output(TupleTag<T> tag, T output) {
throwUnsupportedOutputFromBundleMethods();
}
@@ -683,21 +682,21 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public void output(OutputT output) {
- sideOutput(mainOutputTag, output);
+ output(mainOutputTag, output);
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
- sideOutputWithTimestamp(mainOutputTag, output, timestamp);
+ outputWithTimestamp(mainOutputTag, output, timestamp);
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- sideOutputWithTimestamp(tag, output, element.getTimestamp());
+ public <T> void output(TupleTag<T> tag, T output) {
+ outputWithTimestamp(tag, output, element.getTimestamp());
}
@Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
getMutableOutput(tag)
.add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 3de845b..e3777ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.display.DisplayData.ItemSpec;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters;
@@ -103,7 +104,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* <p>Each of the calls to any of the {@link DoFn DoFn's} processing
* methods can produce zero or more output elements. All of the
* of output elements from all of the {@link DoFn} instances
- * are included in the output {@link PCollection}.
+ * are included in an output {@link PCollection}.
*
* <p>For example:
*
@@ -180,20 +181,20 @@ import org.apache.beam.sdk.values.TypedPValue;
* }}));
* }</pre>
*
- * <h2>Side Outputs</h2>
+ * <h2>Additional Outputs</h2>
*
* <p>Optionally, a {@link ParDo} transform can produce multiple
* output {@link PCollection PCollections}, both a "main output"
- * {@code PCollection<OutputT>} plus any number of "side output"
+ * {@code PCollection<OutputT>} plus any number of additional output
* {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag},
* and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags}
* to be used for the output {@link PCollectionTuple} are specified by
- * invoking {@link SingleOutput#withOutputTags}. Unconsumed side outputs do not
+ * invoking {@link SingleOutput#withOutputTags}. Unconsumed outputs do not
* necessarily need to be explicitly specified, even if the {@link DoFn}
* generates them. Within the {@link DoFn}, an element is added to the
* main output {@link PCollection} as normal, using
- * {@link DoFn.Context#output}, while an element is added to a side output
- * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example:
+ * {@link DoFn.Context#output(Object)}, while an element is added to any additional output
+ * {@link PCollection} using {@link DoFn.Context#output(TupleTag, Object)}. For example:
*
* <pre>{@code
* PCollection<String> words = ...;
@@ -201,7 +202,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* // plus the lengths of words that are above the cut off.
* // Also select words starting with "MARKER".
* final int wordLengthCutOff = 10;
- * // Create tags to use for the main and side outputs.
+ * // Create tags to use for the main and additional outputs.
* final TupleTag<String> wordsBelowCutOffTag =
* new TupleTag<String>(){};
* final TupleTag<Integer> wordLengthsAboveCutOffTag =
@@ -212,7 +213,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* words.apply(
* ParDo
* .of(new DoFn<String, String>() {
- * // Create a tag for the unconsumed side output.
+ * // Create a tag for the unconsumed output.
* final TupleTag<String> specialWordsTag =
* new TupleTag<String>(){};
* {@literal @}ProcessElement
@@ -222,19 +223,19 @@ import org.apache.beam.sdk.values.TypedPValue;
* // Emit this short word to the main output.
* c.output(word);
* } else {
- * // Emit this long word's length to a side output.
- * c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+ * // Emit this long word's length to a specified output.
+ * c.output(wordLengthsAboveCutOffTag, word.length());
* }
* if (word.startsWith("MARKER")) {
- * // Emit this word to a different side output.
- * c.sideOutput(markedWordsTag, word);
+ * // Emit this word to a different specified output.
+ * c.output(markedWordsTag, word);
* }
* if (word.startsWith("SPECIAL")) {
- * // Emit this word to the unconsumed side output.
- * c.sideOutput(specialWordsTag, word);
+ * // Emit this word to the unconsumed output.
+ * c.output(specialWordsTag, word);
* }
* }})
- * // Specify the main and consumed side output tags of the
+ * // Specify the main and consumed output tags of the
* // PCollectionTuple result:
* .withOutputTags(wordsBelowCutOffTag,
* TupleTagList.of(wordLengthsAboveCutOffTag)
@@ -254,9 +255,9 @@ import org.apache.beam.sdk.values.TypedPValue;
* elements of the main output {@link PCollection PCollection<OutputT>} is
* inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}.
*
- * <p>By default, the {@link Coder Coder<SideOutputT>} for the elements of
- * a side output {@link PCollection PCollection<SideOutputT>} is inferred
- * from the concrete type of the corresponding {@link TupleTag TupleTag<SideOutputT>}.
+ * <p>By default, the {@link Coder Coder<AdditionalOutputT>} for the elements of
+ * an output {@link PCollection PCollection<AdditionalOutputT>} is inferred
+ * from the concrete type of the corresponding {@link TupleTag TupleTag<AdditionalOutputT>}.
* To be successful, the {@link TupleTag} should be created as an instance
* of a trivial anonymous subclass, with {@code {}} suffixed to the
* constructor call. Such uses block Java's generic type parameter
@@ -265,12 +266,12 @@ import org.apache.beam.sdk.values.TypedPValue;
* <pre> {@code
* // A TupleTag to use for a side input can be written concisely:
* final TupleTag<Integer> sideInputag = new TupleTag<>();
- * // A TupleTag to use for a side output should be written with "{}",
+ * // A TupleTag to use for an output should be written with "{}",
* // and explicit generic parameter type:
- * final TupleTag<String> sideOutputTag = new TupleTag<String>(){};
+ * final TupleTag<String> additionalOutputTag = new TupleTag<String>(){};
* } </pre>
* This style of {@code TupleTag} instantiation is used in the example of
- * multiple side outputs, above.
+ * {@link ParDo ParDos} that produce multiple outputs, above.
*
* <h2>Serializability of {@link DoFn DoFns}</h2>
*
@@ -358,7 +359,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* that state across Java processes. All information should be
* communicated to {@link DoFn} instances via main and side inputs and
* serialized state, and all output should be communicated from a
- * {@link DoFn} instance via main and side outputs, in the absence of
+ * {@link DoFn} instance via output {@link PCollection PCollections}, in the absence of
* external communication mechanisms written by user code.
*
* <h2>Fault Tolerance</h2>
@@ -602,14 +603,14 @@ public class ParDo {
/**
* Returns a new multi-output {@link ParDo} {@link PTransform} that's like this {@link
- * PTransform} but with the specified main and side output tags. Does not modify this {@link
+ * PTransform} but with the specified output tags. Does not modify this {@link
* PTransform}.
*
- * <p>See the discussion of Side Outputs above for more explanation.
+ * <p>See the discussion of Additional Outputs above for more explanation.
*/
public MultiOutput<InputT, OutputT> withOutputTags(
- TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
- return new MultiOutput<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
+ TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags) {
+ return new MultiOutput<>(fn, sideInputs, mainOutputTag, additionalOutputTags, fnDisplayData);
}
@Override
@@ -671,11 +672,9 @@ public class ParDo {
}
/**
- * A {@link PTransform} that, when applied to a
- * {@code PCollection<InputT>}, invokes a user-specified
- * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements
- * to any of the {@link PTransform}'s main and side output
- * {@code PCollection}s, which are bundled into a result
+ * A {@link PTransform} that, when applied to a {@code PCollection<InputT>}, invokes a
+ * user-specified {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements to
+ * any of the {@link PTransform}'s output {@code PCollection}s, which are bundled into a result
* {@code PCollectionTuple}.
*
* @param <InputT> the type of the (main) input {@code PCollection} elements
@@ -685,7 +684,7 @@ public class ParDo {
extends PTransform<PCollection<? extends InputT>, PCollectionTuple> {
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
- private final TupleTagList sideOutputTags;
+ private final TupleTagList additionalOutputTags;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final DoFn<InputT, OutputT> fn;
@@ -693,11 +692,11 @@ public class ParDo {
DoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- TupleTagList sideOutputTags,
- DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+ TupleTagList additionalOutputTags,
+ ItemSpec<? extends Class<?>> fnDisplayData) {
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
+ this.additionalOutputTags = additionalOutputTags;
this.fn = SerializableUtils.clone(fn);
this.fnDisplayData = fnDisplayData;
}
@@ -730,7 +729,7 @@ public class ParDo {
.addAll(sideInputs)
.build(),
mainOutputTag,
- sideOutputTags,
+ additionalOutputTags,
fnDisplayData);
}
@@ -745,7 +744,7 @@ public class ParDo {
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
- TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
+ TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
input.getWindowingStrategy(),
input.isBounded());
@@ -794,8 +793,8 @@ public class ParDo {
return mainOutputTag;
}
- public TupleTagList getSideOutputTags() {
- return sideOutputTags;
+ public TupleTagList getAdditionalOutputTags() {
+ return additionalOutputTags;
}
public List<PCollectionView<?>> getSideInputs() {
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 2031bc9..595d18c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -169,7 +169,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
if (0 <= partition && partition < numPartitions) {
@SuppressWarnings("unchecked")
TupleTag<X> typedTag = (TupleTag<X>) outputTags.get(partition);
- c.sideOutput(typedTag, input);
+ c.output(typedTag, input);
} else {
throw new IndexOutOfBoundsException(
"Partition function returned out of bounds index: "
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 0ab26ca..ce67e94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -37,8 +37,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* {@link PTransform} taking
* or producing multiple PCollection inputs or outputs that can be of
* different types, for instance a
- * {@link ParDo} with side
- * outputs.
+ * {@link ParDo} with multiple outputs.
*
* <p>A {@link PCollectionTuple} can be created and accessed like follows:
* <pre> {@code
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
index a6b63ab..37d41f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
@@ -31,25 +31,23 @@ import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.PropertyNames;
/**
- * A {@link TupleTag} is a typed tag to use as the key of a
- * heterogeneously typed tuple, like {@link PCollectionTuple}.
- * Its generic type parameter allows tracking
- * the static type of things stored in tuples.
+ * A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link
+ * PCollectionTuple}. Its generic type parameter allows tracking the static type of things stored in
+ * tuples.
*
- * <p>To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of
- * side outputs of {@link ParDo}, an output
- * {@link TupleTag} should be instantiated with an extra {@code {}} so
- * it is an instance of an anonymous subclass without generic type
- * parameters. Input {@link TupleTag TupleTags} require no such extra
- * instantiation (although it doesn't hurt). For example:
+ * <p>To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of a
+ * {@link ParDo}, an output {@link TupleTag} should be instantiated with an extra {@code {}} so it
+ * is an instance of an anonymous subclass without generic type parameters. Input {@link TupleTag
+ * TupleTags} require no such extra instantiation (although it doesn't hurt). For example:
*
- * <pre> {@code
+ * <pre>{@code
* TupleTag<SomeType> inputTag = new TupleTag<>();
* TupleTag<SomeOtherType> outputTag = new TupleTag<SomeOtherType>(){};
- * } </pre>
+ * }
+ * </pre>
*
- * @param <V> the type of the elements or values of the tagged thing,
- * e.g., a {@code PCollection<V>}.
+ * @param <V> the type of the elements or values of the tagged thing, e.g., a {@code
+ * PCollection<V>}.
*/
public class TupleTag<V> implements Serializable {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java
index b4ce941..5aeff5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.ParDo;
/**
* A {@link TupleTagList} is an immutable list of heterogeneously
* typed {@link TupleTag TupleTags}. A {@link TupleTagList} is used, for instance, to
- * specify the tags of the side outputs of a
+ * specify the tags of the additional outputs of a
* {@link ParDo}.
*
* <p>A {@link TupleTagList} can be created and accessed like follows:
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
index d353835..54af747 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
@@ -148,14 +148,14 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
} catch (CannotProvideCoderException exc) {
inferFromTokenException = exc;
- // Attempt to detect when the token came from a TupleTag used for a ParDo side output,
+ // Attempt to detect when the token came from a TupleTag used for a ParDo output,
// and provide a better error message if so. Unfortunately, this information is not
// directly available from the TypeDescriptor, so infer based on the type of the PTransform
// and the error message itself.
if (transform instanceof ParDo.MultiOutput
&& exc.getReason() == ReasonCode.TYPE_ERASURE) {
inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
- + " If this error occurs for a side output of the producing ParDo, verify that the "
+ + " If this error occurs for an output of the producing ParDo, verify that the "
+ "TupleTag for this output is constructed with proper type information (see "
+ "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 3555db3..afe384d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -221,7 +221,7 @@ public class MetricsTest implements Serializable {
values.update(element);
gauge.set(12L);
c.output(element);
- c.sideOutput(output2, element);
+ c.output(output2, element);
}
})
.withOutputTags(output1, TupleTagList.of(output2)));
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index b429eab..589c744 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -153,15 +153,15 @@ public class ParDoTest implements Serializable {
State state = State.NOT_SET_UP;
final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
- final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+ final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
public TestDoFn() {
}
public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
- List<TupleTag<String>> sideOutputTupleTags) {
+ List<TupleTag<String>> additionalOutputTupleTags) {
this.sideInputViews.addAll(sideInputViews);
- this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+ this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
}
@Setup
@@ -197,9 +197,9 @@ public class ParDoTest implements Serializable {
private void outputToAll(Context c, String value) {
c.output(value);
- for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
- c.sideOutput(sideOutputTupleTag,
- sideOutputTupleTag.getId() + ": " + value);
+ for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
+ c.output(additionalOutputTupleTag,
+ additionalOutputTupleTag.getId() + ": " + value);
}
}
@@ -212,9 +212,9 @@ public class ParDoTest implements Serializable {
value += ": " + sideInputValues;
}
c.output(value);
- for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
- c.sideOutput(sideOutputTupleTag,
- sideOutputTupleTag.getId() + ": " + value);
+ for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
+ c.output(additionalOutputTupleTag,
+ additionalOutputTupleTag.getId() + ": " + value);
}
}
}
@@ -389,90 +389,90 @@ public class ParDoTest implements Serializable {
@Test
@Category(ValidatesRunner.class)
- public void testParDoWithSideOutputs() {
+ public void testParDoWithTaggedOutput() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
- TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){};
- TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){};
- TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){};
+ TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
+ TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
+ TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
+ TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
PCollectionTuple outputs = pipeline
.apply(Create.of(inputs))
.apply(ParDo
.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
+ Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
.withOutputTags(
mainOutputTag,
- TupleTagList.of(sideOutputTag3)
- .and(sideOutputTag1)
- .and(sideOutputTagUnwritten)
- .and(sideOutputTag2)));
+ TupleTagList.of(additionalOutputTag3)
+ .and(additionalOutputTag1)
+ .and(additionalOutputTagUnwritten)
+ .and(additionalOutputTag2)));
PAssert.that(outputs.get(mainOutputTag))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
- PAssert.that(outputs.get(sideOutputTag1))
+ PAssert.that(outputs.get(additionalOutputTag1))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag1));
- PAssert.that(outputs.get(sideOutputTag2))
+ .fromOutput(additionalOutputTag1));
+ PAssert.that(outputs.get(additionalOutputTag2))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag2));
- PAssert.that(outputs.get(sideOutputTag3))
+ .fromOutput(additionalOutputTag2));
+ PAssert.that(outputs.get(additionalOutputTag3))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag3));
- PAssert.that(outputs.get(sideOutputTagUnwritten)).empty();
+ .fromOutput(additionalOutputTag3));
+ PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
- public void testParDoEmptyWithSideOutputs() {
+ public void testParDoEmptyWithTaggedOutput() {
TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
- TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){};
- TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){};
- TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){};
+ TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
+ TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
+ TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
+ TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
PCollectionTuple outputs = pipeline
.apply(Create.empty(VarIntCoder.of()))
.apply(ParDo
.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
+ Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
.withOutputTags(
mainOutputTag,
- TupleTagList.of(sideOutputTag3).and(sideOutputTag1)
- .and(sideOutputTagUnwritten).and(sideOutputTag2)));
+ TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1)
+ .and(additionalOutputTagUnwritten).and(additionalOutputTag2)));
List<Integer> inputs = Collections.emptyList();
PAssert.that(outputs.get(mainOutputTag))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
- PAssert.that(outputs.get(sideOutputTag1))
+ PAssert.that(outputs.get(additionalOutputTag1))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag1));
- PAssert.that(outputs.get(sideOutputTag2))
+ .fromOutput(additionalOutputTag1));
+ PAssert.that(outputs.get(additionalOutputTag2))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag2));
- PAssert.that(outputs.get(sideOutputTag3))
+ .fromOutput(additionalOutputTag2));
+ PAssert.that(outputs.get(additionalOutputTag3))
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromSideOutput(sideOutputTag3));
- PAssert.that(outputs.get(sideOutputTagUnwritten)).empty();
+ .fromOutput(additionalOutputTag3));
+ PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
- public void testParDoWithEmptySideOutputs() {
+ public void testParDoWithEmptyTaggedOutput() {
TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
- TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){};
+ TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
+ TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
PCollectionTuple outputs = pipeline
.apply(Create.empty(VarIntCoder.of()))
@@ -480,12 +480,12 @@ public class ParDoTest implements Serializable {
.of(new TestNoOutputDoFn())
.withOutputTags(
mainOutputTag,
- TupleTagList.of(sideOutputTag1).and(sideOutputTag2)));
+ TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2)));
PAssert.that(outputs.get(mainOutputTag)).empty();
- PAssert.that(outputs.get(sideOutputTag1)).empty();
- PAssert.that(outputs.get(sideOutputTag2)).empty();
+ PAssert.that(outputs.get(additionalOutputTag1)).empty();
+ PAssert.that(outputs.get(additionalOutputTag2)).empty();
pipeline.run();
}
@@ -493,12 +493,12 @@ public class ParDoTest implements Serializable {
@Test
@Category(ValidatesRunner.class)
- public void testParDoWithOnlySideOutputs() {
+ public void testParDoWithOnlyTaggedOutput() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
final TupleTag<Void> mainOutputTag = new TupleTag<Void>("main"){};
- final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side"){};
+ final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){};
PCollectionTuple outputs = pipeline
.apply(Create.of(inputs))
@@ -506,29 +506,29 @@ public class ParDoTest implements Serializable {
.of(new DoFn<Integer, Void>(){
@ProcessElement
public void processElement(ProcessContext c) {
- c.sideOutput(sideOutputTag, c.element());
+ c.output(additionalOutputTag, c.element());
}})
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
PAssert.that(outputs.get(mainOutputTag)).empty();
- PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs);
+ PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs);
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
- public void testParDoWritingToUndeclaredSideOutput() {
+ public void testParDoWritingToUndeclaredTag() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
- TupleTag<String> sideTag = new TupleTag<String>("side"){};
+ TupleTag<String> notOutputTag = new TupleTag<String>("additional"){};
PCollection<String> output = pipeline
.apply(Create.of(inputs))
.apply(ParDo.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(sideTag))));
+ Arrays.asList(notOutputTag))));
PAssert.that(output)
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -539,7 +539,7 @@ public class ParDoTest implements Serializable {
@Test
// TODO: The exception thrown is runner-specific, even if the behavior is general
@Category(NeedsRunner.class)
- public void testParDoUndeclaredSideOutputLimit() {
+ public void testParDoUndeclaredTagLimit() {
PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3)));
@@ -548,13 +548,13 @@ public class ParDoTest implements Serializable {
.apply("Success1000", ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
- TupleTag<String> specialSideTag = new TupleTag<String>(){};
- c.sideOutput(specialSideTag, "side");
- c.sideOutput(specialSideTag, "side");
- c.sideOutput(specialSideTag, "side");
+ TupleTag<String> specialOutputTag = new TupleTag<String>(){};
+ c.output(specialOutputTag, "special");
+ c.output(specialOutputTag, "special");
+ c.output(specialOutputTag, "special");
for (int i = 0; i < 998; i++) {
- c.sideOutput(new TupleTag<String>(){}, "side");
+ c.output(new TupleTag<String>(){}, "tag" + i);
}
}}));
pipeline.run();
@@ -565,12 +565,12 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext c) {
for (int i = 0; i < 1000; i++) {
- c.sideOutput(new TupleTag<String>(){}, "side");
+ c.output(new TupleTag<String>(){}, "output" + i);
}
}}));
thrown.expect(RuntimeException.class);
- thrown.expectMessage("the number of side outputs has exceeded a limit");
+ thrown.expectMessage("the number of outputs has exceeded a limit");
pipeline.run();
}
@@ -647,7 +647,7 @@ public class ParDoTest implements Serializable {
List<Integer> inputs = Arrays.asList(3, -42, 666);
final TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"){};
+ final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){};
PCollectionView<Integer> sideInput1 = pipeline
.apply("CreateSideInput1", Create.of(11))
@@ -668,7 +668,7 @@ public class ParDoTest implements Serializable {
.withSideInputs(sideInput1)
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
PAssert.that(outputs.get(mainOutputTag))
.satisfies(ParDoTest.HasExpectedOutput
@@ -685,7 +685,7 @@ public class ParDoTest implements Serializable {
List<Integer> inputs = Arrays.asList(3, -42, 666);
final TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"){};
+ final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){};
PCollectionView<Integer> sideInput1 = pipeline
.apply("CreateSideInput1", Create.of(11))
@@ -706,7 +706,7 @@ public class ParDoTest implements Serializable {
.withSideInputs(sideInput1)
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
PAssert.that(outputs.get(mainOutputTag))
.satisfies(ParDoTest.HasExpectedOutput
@@ -853,37 +853,37 @@ public class ParDoTest implements Serializable {
@Test
public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() {
assertThat(
- ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName(),
- containsString("ParMultiDo(SideOutputDummy)"));
+ ParDo.of(new TaggedOutputDummyFn(null)).withOutputTags(null, null).getName(),
+ containsString("ParMultiDo(TaggedOutputDummy)"));
}
@Test
- public void testParDoWithSideOutputsName() {
+ public void testParDoWithTaggedOutputName() {
pipeline.enableAbandonedNodeEnforcement(false);
TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
- TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){};
- TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){};
- TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){};
+ TupleTag<String> additionalOutputTag1 = new TupleTag<String>("output1"){};
+ TupleTag<String> additionalOutputTag2 = new TupleTag<String>("output2"){};
+ TupleTag<String> additionalOutputTag3 = new TupleTag<String>("output3"){};
+ TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
PCollectionTuple outputs = pipeline
.apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
.apply("MyParDo", ParDo
.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
+ Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
.withOutputTags(
mainOutputTag,
- TupleTagList.of(sideOutputTag3).and(sideOutputTag1)
- .and(sideOutputTagUnwritten).and(sideOutputTag2)));
+ TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1)
+ .and(additionalOutputTagUnwritten).and(additionalOutputTag2)));
assertEquals("MyParDo.main", outputs.get(mainOutputTag).getName());
- assertEquals("MyParDo.side1", outputs.get(sideOutputTag1).getName());
- assertEquals("MyParDo.side2", outputs.get(sideOutputTag2).getName());
- assertEquals("MyParDo.side3", outputs.get(sideOutputTag3).getName());
- assertEquals("MyParDo.sideUnwritten",
- outputs.get(sideOutputTagUnwritten).getName());
+ assertEquals("MyParDo.output1", outputs.get(additionalOutputTag1).getName());
+ assertEquals("MyParDo.output2", outputs.get(additionalOutputTag2).getName());
+ assertEquals("MyParDo.output3", outputs.get(additionalOutputTag3).getName());
+ assertEquals("MyParDo.unwrittenOutput",
+ outputs.get(additionalOutputTagUnwritten).getName());
}
@Test
@@ -892,29 +892,29 @@ public class ParDoTest implements Serializable {
PCollection<Long> longs = pipeline.apply(CountingInput.unbounded());
TupleTag<Long> mainOut = new TupleTag<>();
- final TupleTag<String> sideOutOne = new TupleTag<>();
- final TupleTag<Integer> sideOutTwo = new TupleTag<>();
+ final TupleTag<String> valueAsString = new TupleTag<>();
+ final TupleTag<Integer> valueAsInt = new TupleTag<>();
DoFn<Long, Long> fn =
new DoFn<Long, Long>() {
@ProcessElement
public void processElement(ProcessContext cxt) {
cxt.output(cxt.element());
- cxt.sideOutput(sideOutOne, Long.toString(cxt.element()));
- cxt.sideOutput(sideOutTwo, Long.valueOf(cxt.element()).intValue());
+ cxt.output(valueAsString, Long.toString(cxt.element()));
+ cxt.output(valueAsInt, Long.valueOf(cxt.element()).intValue());
}
};
ParDo.MultiOutput<Long, Long> parDo =
- ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo));
+ ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(valueAsString).and(valueAsInt));
PCollectionTuple firstApplication = longs.apply("first", parDo);
PCollectionTuple secondApplication = longs.apply("second", parDo);
assertThat(firstApplication, not(equalTo(secondApplication)));
assertThat(
firstApplication.getAll().keySet(),
- Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo));
+ Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, valueAsString, valueAsInt));
assertThat(
secondApplication.getAll().keySet(),
- Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo));
+ Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, valueAsString, valueAsInt));
}
@Test
@@ -1017,28 +1017,28 @@ public class ParDoTest implements Serializable {
}
}
- private static class SideOutputDummyFn extends DoFn<Integer, Integer> {
- private TupleTag<TestDummy> sideTag;
- public SideOutputDummyFn(TupleTag<TestDummy> sideTag) {
- this.sideTag = sideTag;
+ private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> {
+ private TupleTag<TestDummy> dummyOutputTag;
+ public TaggedOutputDummyFn(TupleTag<TestDummy> dummyOutputTag) {
+ this.dummyOutputTag = dummyOutputTag;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(1);
- c.sideOutput(sideTag, new TestDummy());
+ c.output(dummyOutputTag, new TestDummy());
}
}
private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
- private TupleTag<Integer> sideTag;
- public MainOutputDummyFn(TupleTag<Integer> sideTag) {
- this.sideTag = sideTag;
+ private TupleTag<Integer> intOutputTag;
+ public MainOutputDummyFn(TupleTag<Integer> intOutputTag) {
+ this.intOutputTag = intOutputTag;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TestDummy());
- c.sideOutput(sideTag, 1);
+ c.output(intOutputTag, 1);
}
}
@@ -1112,7 +1112,7 @@ public class ParDoTest implements Serializable {
implements SerializableFunction<Iterable<String>, Void>, Serializable {
private final List<Integer> inputs;
private final List<Integer> sideInputs;
- private final String sideOutput;
+ private final String additionalOutput;
private final boolean ordered;
public static HasExpectedOutput forInput(List<Integer> inputs) {
@@ -1125,11 +1125,11 @@ public class ParDoTest implements Serializable {
private HasExpectedOutput(List<Integer> inputs,
List<Integer> sideInputs,
- String sideOutput,
+ String additionalOutput,
boolean ordered) {
this.inputs = inputs;
this.sideInputs = sideInputs;
- this.sideOutput = sideOutput;
+ this.additionalOutput = additionalOutput;
this.ordered = ordered;
}
@@ -1138,18 +1138,18 @@ public class ParDoTest implements Serializable {
for (Integer sideInputValue : sideInputValues) {
sideInputs.add(sideInputValue);
}
- return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered);
+ return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered);
}
- public HasExpectedOutput fromSideOutput(TupleTag<String> sideOutputTag) {
- return fromSideOutput(sideOutputTag.getId());
+ public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
+ return fromOutput(outputTag.getId());
}
- public HasExpectedOutput fromSideOutput(String sideOutput) {
- return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered);
+ public HasExpectedOutput fromOutput(String outputId) {
+ return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
}
public HasExpectedOutput inOrder() {
- return new HasExpectedOutput(inputs, sideInputs, sideOutput, true);
+ return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
}
@Override
@@ -1174,17 +1174,17 @@ public class ParDoTest implements Serializable {
sideInputsSuffix = ": " + sideInputs;
}
- String sideOutputPrefix;
- if (sideOutput.isEmpty()) {
- sideOutputPrefix = "";
+ String additionalOutputPrefix;
+ if (additionalOutput.isEmpty()) {
+ additionalOutputPrefix = "";
} else {
- sideOutputPrefix = sideOutput + ": ";
+ additionalOutputPrefix = additionalOutput + ": ";
}
List<String> expectedProcesseds = new ArrayList<>();
for (Integer input : inputs) {
expectedProcesseds.add(
- sideOutputPrefix + "processing: " + input + sideInputsSuffix);
+ additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
}
String[] expectedProcessedsArray =
expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
@@ -1196,10 +1196,10 @@ public class ParDoTest implements Serializable {
assertEquals(starteds.size(), finisheds.size());
for (String started : starteds) {
- assertEquals(sideOutputPrefix + "started", started);
+ assertEquals(additionalOutputPrefix + "started", started);
}
for (String finished : finisheds) {
- assertEquals(sideOutputPrefix + "finished", finished);
+ assertEquals(additionalOutputPrefix + "finished", finished);
}
return null;
@@ -1208,15 +1208,15 @@ public class ParDoTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- public void testSideOutputUnknownCoder() throws Exception {
+ public void testTaggedOutputUnknownCoder() throws Exception {
PCollection<Integer> input = pipeline
.apply(Create.of(Arrays.asList(1, 2, 3)));
final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main");
- final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unknownSide");
- input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("unknownSide");
+ input.apply(ParDo.of(new TaggedOutputDummyFn(additionalOutputTag))
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder");
@@ -1224,26 +1224,27 @@ public class ParDoTest implements Serializable {
}
@Test
- public void testSideOutputUnregisteredExplicitCoder() throws Exception {
+ public void testTaggedOutputUnregisteredExplicitCoder() throws Exception {
pipeline.enableAbandonedNodeEnforcement(false);
PCollection<Integer> input = pipeline
.apply(Create.of(Arrays.asList(1, 2, 3)));
final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main");
- final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide");
- ParDo.MultiOutput<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag))
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag));
+ final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("unregisteredSide");
+ ParDo.MultiOutput<Integer, Integer> pardo =
+ ParDo.of(new TaggedOutputDummyFn(additionalOutputTag))
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag));
PCollectionTuple outputTuple = input.apply(pardo);
- outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder());
+ outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder());
- outputTuple.get(sideOutputTag).apply(View.<TestDummy>asSingleton());
+ outputTuple.get(additionalOutputTag).apply(View.<TestDummy>asSingleton());
- assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder());
- outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
+ assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder());
+ outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
assertEquals(new TestDummyCoder(),
- outputTuple.get(sideOutputTag).getCoder()); // Check for corruption
+ outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption
}
@Test
@@ -1254,9 +1255,11 @@ public class ParDoTest implements Serializable {
.apply(Create.of(Arrays.asList(1, 2, 3)));
final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("unregisteredMain");
- final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side") {};
- PCollectionTuple outputTuple = input.apply(ParDo.of(new MainOutputDummyFn(sideOutputTag))
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additionalOutput") {};
+ PCollectionTuple outputTuple =
+ input.apply(
+ ParDo.of(new MainOutputDummyFn(additionalOutputTag))
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
outputTuple.get(mainOutputTag).setCoder(new TestDummyCoder());
@@ -1265,13 +1268,13 @@ public class ParDoTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- public void testMainOutputApplySideOutputNoCoder() {
+ public void testMainOutputApplyTaggedOutputNoCoder() {
// Regression test: applying a transform to the main output
// should not cause a crash based on lack of a coder for the
- // side output.
+ // additional output.
final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("main");
- final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("side");
+ final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("additionalOutput");
PCollectionTuple tuple = pipeline
.apply(Create.of(new TestDummy())
.withCoder(TestDummyCoder.of()))
@@ -1282,14 +1285,14 @@ public class ParDoTest implements Serializable {
public void processElement(ProcessContext context) {
TestDummy element = context.element();
context.output(element);
- context.sideOutput(sideOutputTag, element);
+ context.output(additionalOutputTag, element);
}
})
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))
);
// Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger
- // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash
+ // tuple.get(additionalOutputTag).finishSpecifyingOutput(), which would crash
// on a missing coder.
tuple.get(mainOutputTag)
.setCoder(TestDummyCoder.of())
@@ -1300,7 +1303,7 @@ public class ParDoTest implements Serializable {
}
}));
- tuple.get(sideOutputTag).setCoder(TestDummyCoder.of());
+ tuple.get(additionalOutputTag).setCoder(TestDummyCoder.of());
pipeline.run();
}
@@ -1328,13 +1331,13 @@ public class ParDoTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- public void testParDoSideOutputWithTimestamp() {
+ public void testParDoTaggedOutputWithTimestamp() {
PCollection<Integer> input =
pipeline.apply(Create.of(Arrays.asList(3, 42, 6)));
final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"){};
- final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side"){};
+ final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){};
PCollection<String> output =
input
@@ -1342,11 +1345,11 @@ public class ParDoTest implements Serializable {
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
- c.sideOutputWithTimestamp(
- sideOutputTag, c.element(), new Instant(c.element().longValue()));
+ c.outputWithTimestamp(
+ additionalOutputTag, c.element(), new Instant(c.element().longValue()));
}
- }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)))
- .get(sideOutputTag)
+ }).withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)))
+ .get(additionalOutputTag)
.apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO)))
.apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
@@ -1914,7 +1917,7 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class})
- public void testValueStateSideOutput() {
+ public void testValueStateTaggedOutput() {
final String stateId = "foo";
final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
@@ -1934,7 +1937,7 @@ public class ParDoTest implements Serializable {
if (currentValue % 2 == 0) {
c.output(currentValue);
} else {
- c.sideOutput(oddTag, currentValue);
+ c.output(oddTag, currentValue);
}
state.write(currentValue + 1);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index a122f67..9e8c12e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -218,12 +218,12 @@ public class SplittableDoFnTest {
private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
private final PCollectionView<String> sideInput;
- private final TupleTag<String> sideOutput;
+ private final TupleTag<String> additionalOutput;
private SDFWithSideInputsAndOutputs(
- PCollectionView<String> sideInput, TupleTag<String> sideOutput) {
+ PCollectionView<String> sideInput, TupleTag<String> additionalOutput) {
this.sideInput = sideInput;
- this.sideOutput = sideOutput;
+ this.additionalOutput = additionalOutput;
}
@ProcessElement
@@ -231,7 +231,7 @@ public class SplittableDoFnTest {
checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
String side = c.sideInput(sideInput);
c.output("main:" + side + ":" + c.element());
- c.sideOutput(sideOutput, "side:" + side + ":" + c.element());
+ c.output(additionalOutput, "additional:" + side + ":" + c.element());
}
@GetInitialRestriction
@@ -247,21 +247,22 @@ public class SplittableDoFnTest {
PCollectionView<String> sideInput =
p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
TupleTag<String> mainOutputTag = new TupleTag<>("main");
- TupleTag<String> sideOutputTag = new TupleTag<>("side");
+ TupleTag<String> additionalOutputTag = new TupleTag<>("additional");
PCollectionTuple res =
p.apply("input", Create.of(0, 1, 2))
.apply(
- ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag))
+ ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag))
.withSideInputs(sideInput)
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
- res.get(sideOutputTag).setCoder(StringUtf8Coder.of());
+ res.get(additionalOutputTag).setCoder(StringUtf8Coder.of());
PAssert.that(res.get(mainOutputTag))
.containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
- PAssert.that(res.get(sideOutputTag))
- .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
+ PAssert.that(res.get(additionalOutputTag))
+ .containsInAnyOrder(
+ Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2"));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 0a0abd6..9df0512 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -82,7 +82,7 @@ public final class PCollectionTupleTest implements Serializable {
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main") {};
TupleTag<Integer> emptyOutputTag = new TupleTag<Integer>("empty") {};
- final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side") {};
+ final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("extra") {};
PCollection<Integer> mainInput = pipeline
.apply(Create.of(inputs));
@@ -91,14 +91,14 @@ public final class PCollectionTupleTest implements Serializable {
.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
- c.sideOutput(sideOutputTag, c.element());
+ c.output(additionalOutputTag, c.element());
}})
- .withOutputTags(emptyOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(emptyOutputTag, TupleTagList.of(additionalOutputTag)));
assertNotNull("outputs.getPipeline()", outputs.getPipeline());
outputs = outputs.and(mainOutputTag, mainInput);
PAssert.that(outputs.get(mainOutputTag)).containsInAnyOrder(inputs);
- PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs);
+ PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs);
PAssert.that(outputs.get(emptyOutputTag)).empty();
pipeline.run();