You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/07 04:49:45 UTC

[incubator-nemo] branch master updated: [NEMO-260] Beam Accumulator-based Partial Aggregation (#148)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 61207a5  [NEMO-260] Beam Accumulator-based Partial Aggregation (#148)
61207a5 is described below

commit 61207a5936045d6fe550234e47b539d2af4fc04f
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Nov 7 13:49:40 2018 +0900

    [NEMO-260] Beam Accumulator-based Partial Aggregation (#148)
    
    JIRA: [NEMO-260: Beam Accumulator-based Partial Aggregation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-260)
    
    **Major changes:**
    - Accumulator-based combining for Combine.PerKey PTransforms
    - CombineFnPartialTransform, and CombineFnFinalTransform
    
    **Minor changes to note:**
    - Re-enable the optimization pass tests
    
    **Tests for the changes:**
    - Existing tests that use Per.Key continue to pass
    
    **Other comments:**
    - This change applies to batch operations only (for now)
    
    Closes #148
---
 .../compiler/frontend/beam/PipelineTranslator.java |  50 +++++++++-
 .../beam/transform/CombineFnFinalTransform.java    | 106 +++++++++++++++++++++
 .../beam/transform/CombineFnPartialTransform.java  | 104 ++++++++++++++++++++
 .../frontend/beam/BeamFrontendALSTest.java         |   3 +-
 .../frontend/beam/BeamFrontendMLRTest.java         |   3 +-
 .../TransientResourceCompositePassTest.java        |   3 +-
 .../reshaping/LoopExtractionPassTest.java          |   3 +-
 .../LoopInvariantCodeMotionALSInefficientTest.java |   3 +-
 .../reshaping/LoopInvariantCodeMotionPassTest.java |   3 +-
 9 files changed, 264 insertions(+), 14 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 9118d98..7a22ba8 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
@@ -279,8 +281,52 @@ final class PipelineTranslator {
     final PipelineTranslationContext ctx,
     final TransformHierarchy.Node beamNode,
     final PTransform<?, ?> transform) {
-    // TODO #260: Beam Accumulator-based Partial Aggregation
-    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+
+    // Check if the partial combining optimization can be applied.
+    // If not, simply use the default Combine implementation by entering into it.
+    if (!isGlobalWindow(beamNode, ctx.getPipeline())) {
+      // TODO #263: Partial Combining for Beam Streaming
+      return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+    }
+    final Combine.PerKey perKey = (Combine.PerKey) transform;
+    if (!perKey.getSideInputs().isEmpty()) {
+      // TODO #264: Partial Combining with Beam SideInputs
+      return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    // This Combine can be optimized as the following sequence of Nemo IRVertices.
+    // Combine Input -> Combine(Partial Combine -> KV<InputT, AccumT> -> Final Combine) -> Combine Output
+    final CombineFnBase.GlobalCombineFn combineFn = perKey.getFn();
+
+    // (Step 1) To Partial Combine
+    final IRVertex partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
+    ctx.addVertex(partialCombine);
+    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
+
+    // (Step 2) To Final Combine
+    final PCollection input = (PCollection) Iterables.getOnlyElement(
+      TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
+    final KvCoder inputCoder = (KvCoder) input.getCoder();
+    final Coder accumulatorCoder;
+    try {
+      accumulatorCoder =
+        combineFn.getAccumulatorCoder(ctx.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(e);
+    }
+    final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
+    ctx.addVertex(finalCombine);
+    final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, partialCombine, finalCombine);
+    ctx.addEdgeTo(
+      edge,
+      KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+      input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    // (Step 3) To Combine Output
+    beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, finalCombine, output));
+
+    // This composite transform has been translated in its entirety.
+    return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
   }
 
   /**
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
new file mode 100644
index 0000000..d7f0e3e
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Accumulates all of the partially accumulated KVs(Key, Accum) into KVs(Key, Output).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <A> Accum type.
+ * @param <O> Output type.
+ */
+public final class CombineFnFinalTransform<K, A, O>
+  extends NoWatermarkEmitTransform<WindowedValue<KV<K, A>>, WindowedValue<KV<K, O>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CombineFnFinalTransform.class.getName());
+  private final Map<K, A> keyToAcuumulator;
+  private OutputCollector<WindowedValue<KV<K, O>>> outputCollector;
+
+  // null arguments when calling methods of this variable, since we don't support sideinputs yet.
+  private final GlobalCombineFnRunner<?, A, O> combineFnRunner;
+
+  /**
+   * Constructor.
+   */
+  public CombineFnFinalTransform(final CombineFnBase.GlobalCombineFn<?, A, O> combineFn) {
+    this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+    this.keyToAcuumulator = new HashMap<>();
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<KV<K, O>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, A>> element) {
+    final K key = element.getValue().getKey();
+    final A accum = element.getValue().getValue();
+
+    // The initial accumulator
+    keyToAcuumulator.putIfAbsent(
+      key, combineFnRunner.createAccumulator(null, null, null));
+
+    // Get the accumulator
+    final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+    // Update the accumulator (merge)
+    keyToAcuumulator.put(
+      key,
+      combineFnRunner.mergeAccumulators(
+        Arrays.asList(accumulatorForThisElement, accum), null, null, null));
+  }
+
+  @Override
+  public void close() {
+    final Iterator<Map.Entry<K, A>> iterator = keyToAcuumulator.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<K, A> entry = iterator.next();
+      final K key = entry.getKey();
+      final A accum = entry.getValue();
+      final O output = combineFnRunner.extractOutput(accum, null, null, null);
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, output)));
+      iterator.remove(); // for eager garbage collection
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("CombineFnPartialTransform:");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
new file mode 100644
index 0000000..dbe5acd
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Partially accumulates the given KVs(Key, Input) into KVs(Key, Accum).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <I> Input type.
+ * @param <A> Accum type.
+ */
+public final class CombineFnPartialTransform<K, I, A>
+  extends NoWatermarkEmitTransform<WindowedValue<KV<K, I>>, WindowedValue<KV<K, A>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CombineFnPartialTransform.class.getName());
+  private final Map<K, A> keyToAcuumulator;
+  private OutputCollector<WindowedValue<KV<K, A>>> outputCollector;
+
+  // null arguments when calling methods of this variable, since we don't support sideinputs yet.
+  private final GlobalCombineFnRunner<I, A, ?> combineFnRunner;
+
+  /**
+   * Constructor.
+   */
+  public CombineFnPartialTransform(final CombineFnBase.GlobalCombineFn<I, A, ?> combineFn) {
+    this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+    this.keyToAcuumulator = new HashMap<>();
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<KV<K, A>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, I>> element) {
+    final K key = element.getValue().getKey();
+    final I val = element.getValue().getValue();
+
+    // The initial accumulator
+    keyToAcuumulator.putIfAbsent(
+      key, combineFnRunner.createAccumulator(null, null, null));
+
+    // Get the accumulator
+    final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+    // Update the accumulator
+    keyToAcuumulator.putIfAbsent(
+      key,
+      combineFnRunner.addInput(accumulatorForThisElement, val, null, null, null));
+  }
+
+  @Override
+  public void close() {
+    final Iterator<Map.Entry<K, A>> iterator = keyToAcuumulator.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<K, A> entry = iterator.next();
+      final K key = entry.getKey();
+      final A accum = entry.getValue();
+      final A compactAccum = combineFnRunner.compact(accum, null, null, null);
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, compactAccum)));
+      iterator.remove(); // for eager garbage collection
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("CombineFnPartialTransform:");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index fde90a9..be3f008 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class BeamFrontendALSTest {
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testALSDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index d52d13c..632c30a 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public class BeamFrontendMLRTest {
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testMLRDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 8b23349..426976d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -49,8 +49,7 @@ public class TransientResourceCompositePassTest {
     compiledDAG = CompilerTestUtil.compileALSDAG();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testTransientResourcePass() throws Exception {
     final DAG<IRVertex, IREdge> processedDAG = new TransientResourceCompositePass().apply(compiledDAG);
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
index 6f8ed04..4af6214 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
@@ -44,8 +44,7 @@ public class LoopExtractionPassTest {
     compiledDAG = CompilerTestUtil.compileALSDAG();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testLoopGrouping() {
     final DAG<IRVertex, IREdge> processedDAG = new LoopExtractionPass().apply(compiledDAG);
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
index 29ef1d7..d1a66b8 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
@@ -46,8 +46,7 @@ public class LoopInvariantCodeMotionALSInefficientTest {
     groupedDAG = new LoopExtractionPass().apply(inefficientALSDAG);
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testForInefficientALSDAG() throws Exception {
     final long expectedNumOfVertices = groupedDAG.getVertices().size() + 3;
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index d6b3085..bdc2a85 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -105,8 +105,7 @@ public class LoopInvariantCodeMotionPassTest {
     dagToBeRefactored = builder.build();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testLoopInvariantCodeMotionPass() throws Exception {
     final long numberOfGroupedVertices = groupedDAG.getVertices().size();