You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2020/10/18 05:05:43 UTC

[incubator-nemo] branch master updated: [NEMO-460] Setting coders in CombinePerKey transformation (#303)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d46caf  [NEMO-460] Setting coders in CombinePerKey transformation (#303)
3d46caf is described below

commit 3d46caf02c84c739e448d8cc6220099fa8df58d7
Author: jaehwan0214 <60...@users.noreply.github.com>
AuthorDate: Sun Oct 18 14:05:34 2020 +0900

    [NEMO-460] Setting coders in CombinePerKey transformation (#303)
    
    JIRA: [NEMO-460: Setting coders in CombinePerKey transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-460)
    
    **Major changes:**
    - Added the additional parameter "inputCoder" for GBKTransform constructor.
    - Fixed the input coder and the output coder for the partial combine transform and the final combine transform.
    
    **Minor changes to note:**
    - Fixed the main output TupleTags for the partial combine transform and the final combine transform.
    
    **Tests for the changes:**
    - Current tests suffice.
    
    **Other comments:**
    - This needs to be merged after merging #302
    
    Closes #303
---
 .../nemo/compiler/frontend/beam/PipelineTranslator.java  | 14 ++++++++------
 .../compiler/frontend/beam/transform/GBKTransform.java   | 11 ++++++-----
 .../frontend/beam/transform/GBKTransformTest.java        | 16 +++++++++-------
 3 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 21ded1c..cd9d7ad 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -406,10 +406,11 @@ final class PipelineTranslator {
             KvCoder.of(inputCoder.getKeyCoder(),
               accumulatorCoder),
             null, mainInput.getWindowingStrategy()));
+      final TupleTag<?> partialMainOutputTag = new TupleTag<>();
       final GBKTransform partialCombineStreamTransform =
-        new GBKTransform(
-          getOutputCoders(pTransform),
-          new TupleTag<>(),
+        new GBKTransform(inputCoder,
+          Collections.singletonMap(partialMainOutputTag, KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)),
+          partialMainOutputTag,
           mainInput.getWindowingStrategy(),
           ctx.getPipelineOptions(),
           partialSystemReduceFn,
@@ -418,9 +419,9 @@ final class PipelineTranslator {
           true);
 
       final GBKTransform finalCombineStreamTransform =
-        new GBKTransform(
+        new GBKTransform(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
           getOutputCoders(pTransform),
-          new TupleTag<>(),
+          Iterables.getOnlyElement(beamNode.getOutputs().keySet()),
           mainInput.getWindowingStrategy(),
           ctx.getPipelineOptions(),
           finalSystemReduceFn,
@@ -556,7 +557,7 @@ final class PipelineTranslator {
     final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
     final PCollection<?> mainInput = (PCollection<?>)
       Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
-    final TupleTag mainOutputTag = new TupleTag<>();
+    final TupleTag mainOutputTag = Iterables.getOnlyElement(beamNode.getOutputs().keySet());
 
     if (isGlobalWindow(beamNode, ctx.getPipeline())) {
       // GroupByKey Transform when using a global windowing strategy.
@@ -564,6 +565,7 @@ final class PipelineTranslator {
     } else {
       // GroupByKey Transform when using a non-global windowing strategy.
       return new GBKTransform<>(
+        (KvCoder) mainInput.getCoder(),
         getOutputCoders(pTransform),
         mainOutputTag,
         mainInput.getWindowingStrategy(),
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 9dd2e5a..1bf6cb8 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -58,7 +58,8 @@ public final class GBKTransform<K, InputT, OutputT>
   private transient OutputCollector originOc;
   private final boolean isPartialCombining;
 
-  public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
+  public GBKTransform(final Coder<KV<K, InputT>> inputCoder,
+                      final Map<TupleTag<?>, Coder<?>> outputCoders,
                       final TupleTag<KV<K, OutputT>> mainOutputTag,
                       final WindowingStrategy<?, ?> windowingStrategy,
                       final PipelineOptions options,
@@ -67,7 +68,7 @@ public final class GBKTransform<K, InputT, OutputT>
                       final DisplayData displayData,
                       final boolean isPartialCombining) {
     super(null,
-      null,
+      inputCoder,
       outputCoders,
       mainOutputTag,
       Collections.emptyList(),  /* no additional outputs */
@@ -278,7 +279,7 @@ public final class GBKTransform<K, InputT, OutputT>
 
     /** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */
     @Override
-    public void emit(final WindowedValue<KV<K, OutputT>> output) {
+    public final void emit(final WindowedValue<KV<K, OutputT>> output) {
       // The watermark advances only in ON_TIME
       if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
         KV<K, OutputT> value = output.getValue();
@@ -296,13 +297,13 @@ public final class GBKTransform<K, InputT, OutputT>
 
     /** Emit watermark. */
     @Override
-    public void emitWatermark(final Watermark watermark) {
+    public final void emitWatermark(final Watermark watermark) {
       oc.emitWatermark(watermark);
     }
 
     /** Emit output value to {@param dstVertexId}. */
     @Override
-    public <T> void emit(final String dstVertexId, final T output) {
+    public final <T> void emit(final String dstVertexId, final T output) {
       oc.emit(dstVertexId, output);
     }
   }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
index 45933b0..3c08c50 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import com.google.common.collect.Iterables;
 import junit.framework.TestCase;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.*;
@@ -41,15 +42,12 @@ import java.util.*;
 
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
 import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
-import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 public class GBKTransformTest extends TestCase {
   private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName());
   private final static Coder STRING_CODER = StringUtf8Coder.of();
   private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of();
-  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
 
   private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) {
     // check key
@@ -155,7 +153,8 @@ public class GBKTransformTest extends TestCase {
 
     final GBKTransform<String, Integer, Integer> combine_transform =
       new GBKTransform(
-        NULL_OUTPUT_CODERS,
+        KvCoder.of(STRING_CODER, INTEGER_CODER),
+        Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
         outputTag,
         WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES),
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -283,7 +282,8 @@ public class GBKTransformTest extends TestCase {
 
     final GBKTransform<String, Integer, Integer> combine_transform =
       new GBKTransform(
-        NULL_OUTPUT_CODERS,
+        KvCoder.of(STRING_CODER, INTEGER_CODER),
+        Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
         outputTag,
         WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness),
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -377,7 +377,8 @@ public class GBKTransformTest extends TestCase {
 
     final GBKTransform<String, String, Iterable<String>> doFnTransform =
       new GBKTransform(
-        NULL_OUTPUT_CODERS,
+        KvCoder.of(STRING_CODER, STRING_CODER),
+        Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
         outputTag,
         WindowingStrategy.of(slidingWindows),
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
@@ -562,7 +563,8 @@ public class GBKTransformTest extends TestCase {
 
     final GBKTransform<String, String, Iterable<String>> doFnTransform =
       new GBKTransform(
-        NULL_OUTPUT_CODERS,
+        KvCoder.of(STRING_CODER, STRING_CODER),
+        Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
         outputTag,
         WindowingStrategy.of(window).withTrigger(trigger)
           .withMode(ACCUMULATING_FIRED_PANES)