You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2020/10/18 05:05:43 UTC
[incubator-nemo] branch master updated: [NEMO-460] Setting coders
in CombinePerKey transformation (#303)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 3d46caf [NEMO-460] Setting coders in CombinePerKey transformation (#303)
3d46caf is described below
commit 3d46caf02c84c739e448d8cc6220099fa8df58d7
Author: jaehwan0214 <60...@users.noreply.github.com>
AuthorDate: Sun Oct 18 14:05:34 2020 +0900
[NEMO-460] Setting coders in CombinePerKey transformation (#303)
JIRA: [NEMO-460: Setting coders in CombinePerKey transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-460)
**Major changes:**
- Added the additional parameter "inputCoder" for GBKTransform constructor.
- Fixed the input coder and the output coder for the partial combine transform and the final combine transform.
**Minor changes to note:**
- Fixed the main output TupleTags for the partial combine transform and the final combine transform.
**Tests for the changes:**
- Current tests suffice.
**Other comments:**
- This needs to be merged after merging #302
Closes #303
---
.../nemo/compiler/frontend/beam/PipelineTranslator.java | 14 ++++++++------
.../compiler/frontend/beam/transform/GBKTransform.java | 11 ++++++-----
.../frontend/beam/transform/GBKTransformTest.java | 16 +++++++++-------
3 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 21ded1c..cd9d7ad 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -406,10 +406,11 @@ final class PipelineTranslator {
KvCoder.of(inputCoder.getKeyCoder(),
accumulatorCoder),
null, mainInput.getWindowingStrategy()));
+ final TupleTag<?> partialMainOutputTag = new TupleTag<>();
final GBKTransform partialCombineStreamTransform =
- new GBKTransform(
- getOutputCoders(pTransform),
- new TupleTag<>(),
+ new GBKTransform(inputCoder,
+ Collections.singletonMap(partialMainOutputTag, KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)),
+ partialMainOutputTag,
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
partialSystemReduceFn,
@@ -418,9 +419,9 @@ final class PipelineTranslator {
true);
final GBKTransform finalCombineStreamTransform =
- new GBKTransform(
+ new GBKTransform(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
getOutputCoders(pTransform),
- new TupleTag<>(),
+ Iterables.getOnlyElement(beamNode.getOutputs().keySet()),
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
finalSystemReduceFn,
@@ -556,7 +557,7 @@ final class PipelineTranslator {
final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
- final TupleTag mainOutputTag = new TupleTag<>();
+ final TupleTag mainOutputTag = Iterables.getOnlyElement(beamNode.getOutputs().keySet());
if (isGlobalWindow(beamNode, ctx.getPipeline())) {
// GroupByKey Transform when using a global windowing strategy.
@@ -564,6 +565,7 @@ final class PipelineTranslator {
} else {
// GroupByKey Transform when using a non-global windowing strategy.
return new GBKTransform<>(
+ (KvCoder) mainInput.getCoder(),
getOutputCoders(pTransform),
mainOutputTag,
mainInput.getWindowingStrategy(),
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 9dd2e5a..1bf6cb8 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -58,7 +58,8 @@ public final class GBKTransform<K, InputT, OutputT>
private transient OutputCollector originOc;
private final boolean isPartialCombining;
- public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
+ public GBKTransform(final Coder<KV<K, InputT>> inputCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, OutputT>> mainOutputTag,
final WindowingStrategy<?, ?> windowingStrategy,
final PipelineOptions options,
@@ -67,7 +68,7 @@ public final class GBKTransform<K, InputT, OutputT>
final DisplayData displayData,
final boolean isPartialCombining) {
super(null,
- null,
+ inputCoder,
outputCoders,
mainOutputTag,
Collections.emptyList(), /* no additional outputs */
@@ -278,7 +279,7 @@ public final class GBKTransform<K, InputT, OutputT>
/** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */
@Override
- public void emit(final WindowedValue<KV<K, OutputT>> output) {
+ public final void emit(final WindowedValue<KV<K, OutputT>> output) {
// The watermark advances only in ON_TIME
if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
KV<K, OutputT> value = output.getValue();
@@ -296,13 +297,13 @@ public final class GBKTransform<K, InputT, OutputT>
/** Emit watermark. */
@Override
- public void emitWatermark(final Watermark watermark) {
+ public final void emitWatermark(final Watermark watermark) {
oc.emitWatermark(watermark);
}
/** Emit output value to {@param dstVertexId}. */
@Override
- public <T> void emit(final String dstVertexId, final T output) {
+ public final <T> void emit(final String dstVertexId, final T output) {
oc.emit(dstVertexId, output);
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
index 45933b0..3c08c50 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;
+import com.google.common.collect.Iterables;
import junit.framework.TestCase;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.*;
@@ -41,15 +42,12 @@ import java.util.*;
import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
-import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class GBKTransformTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName());
private final static Coder STRING_CODER = StringUtf8Coder.of();
private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of();
- private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) {
// check key
@@ -155,7 +153,8 @@ public class GBKTransformTest extends TestCase {
final GBKTransform<String, Integer, Integer> combine_transform =
new GBKTransform(
- NULL_OUTPUT_CODERS,
+ KvCoder.of(STRING_CODER, INTEGER_CODER),
+ Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
outputTag,
WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -283,7 +282,8 @@ public class GBKTransformTest extends TestCase {
final GBKTransform<String, Integer, Integer> combine_transform =
new GBKTransform(
- NULL_OUTPUT_CODERS,
+ KvCoder.of(STRING_CODER, INTEGER_CODER),
+ Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
outputTag,
WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -377,7 +377,8 @@ public class GBKTransformTest extends TestCase {
final GBKTransform<String, String, Iterable<String>> doFnTransform =
new GBKTransform(
- NULL_OUTPUT_CODERS,
+ KvCoder.of(STRING_CODER, STRING_CODER),
+ Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
outputTag,
WindowingStrategy.of(slidingWindows),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -562,7 +563,8 @@ public class GBKTransformTest extends TestCase {
final GBKTransform<String, String, Iterable<String>> doFnTransform =
new GBKTransform(
- NULL_OUTPUT_CODERS,
+ KvCoder.of(STRING_CODER, STRING_CODER),
+ Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
outputTag,
WindowingStrategy.of(window).withTrigger(trigger)
.withMode(ACCUMULATING_FIRED_PANES)