You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/07 04:49:45 UTC
[incubator-nemo] branch master updated: [NEMO-260] Beam
Accumulator-based Partial Aggregation (#148)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 61207a5 [NEMO-260] Beam Accumulator-based Partial Aggregation (#148)
61207a5 is described below
commit 61207a5936045d6fe550234e47b539d2af4fc04f
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Nov 7 13:49:40 2018 +0900
[NEMO-260] Beam Accumulator-based Partial Aggregation (#148)
JIRA: [NEMO-260: Beam Accumulator-based Partial Aggregation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-260)
**Major changes:**
- Accumulator-based combining for Combine.PerKey PTransforms
- CombineFnPartialTransform, and CombineFnFinalTransform
**Minor changes to note:**
- Re-enable the optimization pass tests
**Tests for the changes:**
- Existing tests that use Per.Key continue to pass
**Other comments:**
- This change applies to batch operations only (for now)
Closes #148
---
.../compiler/frontend/beam/PipelineTranslator.java | 50 +++++++++-
.../beam/transform/CombineFnFinalTransform.java | 106 +++++++++++++++++++++
.../beam/transform/CombineFnPartialTransform.java | 104 ++++++++++++++++++++
.../frontend/beam/BeamFrontendALSTest.java | 3 +-
.../frontend/beam/BeamFrontendMLRTest.java | 3 +-
.../TransientResourceCompositePassTest.java | 3 +-
.../reshaping/LoopExtractionPassTest.java | 3 +-
.../LoopInvariantCodeMotionALSInefficientTest.java | 3 +-
.../reshaping/LoopInvariantCodeMotionPassTest.java | 3 +-
9 files changed, 264 insertions(+), 14 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 9118d98..7a22ba8 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
@@ -279,8 +281,52 @@ final class PipelineTranslator {
final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final PTransform<?, ?> transform) {
- // TODO #260: Beam Accumulator-based Partial Aggregation
- return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+
+ // Check if the partial combining optimization can be applied.
+ // If not, simply use the default Combine implementation by entering into it.
+ if (!isGlobalWindow(beamNode, ctx.getPipeline())) {
+ // TODO #263: Partial Combining for Beam Streaming
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+ }
+ final Combine.PerKey perKey = (Combine.PerKey) transform;
+ if (!perKey.getSideInputs().isEmpty()) {
+ // TODO #264: Partial Combining with Beam SideInputs
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ // This Combine can be optimized as the following sequence of Nemo IRVertices.
+ // Combine Input -> Combine(Partial Combine -> KV<InputT, AccumT> -> Final Combine) -> Combine Output
+ final CombineFnBase.GlobalCombineFn combineFn = perKey.getFn();
+
+ // (Step 1) To Partial Combine
+ final IRVertex partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
+ ctx.addVertex(partialCombine);
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
+
+ // (Step 2) To Final Combine
+ final PCollection input = (PCollection) Iterables.getOnlyElement(
+ TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
+ final KvCoder inputCoder = (KvCoder) input.getCoder();
+ final Coder accumulatorCoder;
+ try {
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(ctx.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+ final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
+ ctx.addVertex(finalCombine);
+ final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, partialCombine, finalCombine);
+ ctx.addEdgeTo(
+ edge,
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ // (Step 3) To Combine Output
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, finalCombine, output));
+
+ // This composite transform has been translated in its entirety.
+ return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
}
/**
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
new file mode 100644
index 0000000..d7f0e3e
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Accumulates all of the partially accumulated KVs(Key, Accum) into KVs(Key, Output).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <A> Accum type.
+ * @param <O> Output type.
+ */
+public final class CombineFnFinalTransform<K, A, O>
+ extends NoWatermarkEmitTransform<WindowedValue<KV<K, A>>, WindowedValue<KV<K, O>>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CombineFnFinalTransform.class.getName());
+ private final Map<K, A> keyToAcuumulator;
+ private OutputCollector<WindowedValue<KV<K, O>>> outputCollector;
+
+ // null arguments when calling methods of this variable, since we don't support sideinputs yet.
+ private final GlobalCombineFnRunner<?, A, O> combineFnRunner;
+
+ /**
+ * Constructor.
+ */
+ public CombineFnFinalTransform(final CombineFnBase.GlobalCombineFn<?, A, O> combineFn) {
+ this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+ this.keyToAcuumulator = new HashMap<>();
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<WindowedValue<KV<K, O>>> oc) {
+ this.outputCollector = oc;
+ }
+
+ @Override
+ public void onData(final WindowedValue<KV<K, A>> element) {
+ final K key = element.getValue().getKey();
+ final A accum = element.getValue().getValue();
+
+ // The initial accumulator
+ keyToAcuumulator.putIfAbsent(
+ key, combineFnRunner.createAccumulator(null, null, null));
+
+ // Get the accumulator
+ final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+ // Update the accumulator (merge)
+ keyToAcuumulator.put(
+ key,
+ combineFnRunner.mergeAccumulators(
+ Arrays.asList(accumulatorForThisElement, accum), null, null, null));
+ }
+
+ @Override
+ public void close() {
+ final Iterator<Map.Entry<K, A>> iterator = keyToAcuumulator.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<K, A> entry = iterator.next();
+ final K key = entry.getKey();
+ final A accum = entry.getValue();
+ final O output = combineFnRunner.extractOutput(accum, null, null, null);
+ outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, output)));
+ iterator.remove(); // for eager garbage collection
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("CombineFnPartialTransform:");
+ sb.append(super.toString());
+ return sb.toString();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
new file mode 100644
index 0000000..dbe5acd
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Partially accumulates the given KVs(Key, Input) into KVs(Key, Accum).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <I> Input type.
+ * @param <A> Accum type.
+ */
+public final class CombineFnPartialTransform<K, I, A>
+ extends NoWatermarkEmitTransform<WindowedValue<KV<K, I>>, WindowedValue<KV<K, A>>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CombineFnPartialTransform.class.getName());
+ private final Map<K, A> keyToAcuumulator;
+ private OutputCollector<WindowedValue<KV<K, A>>> outputCollector;
+
+ // null arguments when calling methods of this variable, since we don't support sideinputs yet.
+ private final GlobalCombineFnRunner<I, A, ?> combineFnRunner;
+
+ /**
+ * Constructor.
+ */
+ public CombineFnPartialTransform(final CombineFnBase.GlobalCombineFn<I, A, ?> combineFn) {
+ this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+ this.keyToAcuumulator = new HashMap<>();
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<WindowedValue<KV<K, A>>> oc) {
+ this.outputCollector = oc;
+ }
+
+ @Override
+ public void onData(final WindowedValue<KV<K, I>> element) {
+ final K key = element.getValue().getKey();
+ final I val = element.getValue().getValue();
+
+ // The initial accumulator
+ keyToAcuumulator.putIfAbsent(
+ key, combineFnRunner.createAccumulator(null, null, null));
+
+ // Get the accumulator
+ final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+ // Update the accumulator
+ keyToAcuumulator.putIfAbsent(
+ key,
+ combineFnRunner.addInput(accumulatorForThisElement, val, null, null, null));
+ }
+
+ @Override
+ public void close() {
+ final Iterator<Map.Entry<K, A>> iterator = keyToAcuumulator.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<K, A> entry = iterator.next();
+ final K key = entry.getKey();
+ final A accum = entry.getValue();
+ final A compactAccum = combineFnRunner.compact(accum, null, null, null);
+ outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, compactAccum)));
+ iterator.remove(); // for eager garbage collection
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("CombineFnPartialTransform:");
+ sb.append(super.toString());
+ return sb.toString();
+ }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index fde90a9..be3f008 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class BeamFrontendALSTest {
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testALSDAG() throws Exception {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index d52d13c..632c30a 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public class BeamFrontendMLRTest {
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testMLRDAG() throws Exception {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 8b23349..426976d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -49,8 +49,7 @@ public class TransientResourceCompositePassTest {
compiledDAG = CompilerTestUtil.compileALSDAG();
}
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testTransientResourcePass() throws Exception {
final DAG<IRVertex, IREdge> processedDAG = new TransientResourceCompositePass().apply(compiledDAG);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
index 6f8ed04..4af6214 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
@@ -44,8 +44,7 @@ public class LoopExtractionPassTest {
compiledDAG = CompilerTestUtil.compileALSDAG();
}
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testLoopGrouping() {
final DAG<IRVertex, IREdge> processedDAG = new LoopExtractionPass().apply(compiledDAG);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
index 29ef1d7..d1a66b8 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
@@ -46,8 +46,7 @@ public class LoopInvariantCodeMotionALSInefficientTest {
groupedDAG = new LoopExtractionPass().apply(inefficientALSDAG);
}
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testForInefficientALSDAG() throws Exception {
final long expectedNumOfVertices = groupedDAG.getVertices().size() + 3;
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index d6b3085..bdc2a85 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -105,8 +105,7 @@ public class LoopInvariantCodeMotionPassTest {
dagToBeRefactored = builder.build();
}
- // TODO #260: Beam Accumulator-based Partial Aggregation
- // @Test
+ @Test
public void testLoopInvariantCodeMotionPass() throws Exception {
final long numberOfGroupedVertices = groupedDAG.getVertices().size();