You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/06 07:05:21 UTC
[incubator-nemo] branch master updated: [NEMO-269] Direct
translation from Beam DAG to Nemo DAG (#150)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 8759f3f [NEMO-269] Direct translation from Beam DAG to Nemo DAG (#150)
8759f3f is described below
commit 8759f3fe75465501c6bf6fccee2f35858656b524
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Nov 6 16:05:16 2018 +0900
[NEMO-269] Direct translation from Beam DAG to Nemo DAG (#150)
JIRA: [NEMO-269: Direct translation from Beam DAG to Nemo DAG](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-269)
**Major changes:**
- Direct translation from Beam DAG to Nemo DAG using CompositeBehavior
- Disables the previous partial combining optimization, to prepare for the new accumulator-based optimization
---
.../java/org/apache/nemo/client/JobLauncher.java | 8 +-
.../compiler/frontend/beam/NemoPipelineRunner.java | 11 +-
.../frontend/beam/PipelineTranslationContext.java | 264 ++++++++
.../compiler/frontend/beam/PipelineTranslator.java | 709 ++++++---------------
.../compiler/frontend/beam/PipelineVisitor.java | 277 +-------
compiler/pom.xml | 13 +
compiler/test/pom.xml | 10 +
.../frontend/beam/BeamFrontendALSTest.java | 26 +-
.../frontend/beam/BeamFrontendMLRTest.java | 26 +-
.../TransientResourceCompositePassTest.java | 41 +-
.../reshaping/LoopExtractionPassTest.java | 5 +-
.../LoopInvariantCodeMotionALSInefficientTest.java | 5 +-
.../reshaping/LoopInvariantCodeMotionPassTest.java | 29 +-
.../examples/beam/WindowedWordCountITCase.java | 1 +
14 files changed, 564 insertions(+), 861 deletions(-)
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 7ec5fdd..035d719 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,9 +101,12 @@ public final class JobLauncher {
* @throws Exception exception on the way.
*/
public static void main(final String[] args) throws Exception {
- driverRPCServer = new DriverRPCServer();
+ // Get Job and Driver Confs
+ builtJobConf = getJobConf(args);
// Registers actions for launching the DAG.
+ LOG.info("Launching RPC Server");
+ driverRPCServer = new DriverRPCServer();
driverRPCServer
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
})
@@ -113,8 +116,6 @@ public final class JobLauncher {
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
.run();
- // Get Job and Driver Confs
- builtJobConf = getJobConf(args);
final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
final Configuration driverMessageConfg = getDriverMessageConf();
@@ -138,6 +139,7 @@ public final class JobLauncher {
throw new RuntimeException("Configuration for launching driver is not ready");
}
+
// Launch driver
LOG.info("Launching driver");
driverReadyLatch = new CountDownLatch(1);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
index 37798c0..d011d11 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
@@ -19,9 +19,6 @@
package org.apache.nemo.compiler.frontend.beam;
import org.apache.nemo.client.JobLauncher;
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -57,14 +54,10 @@ public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult>
* @return The result of the pipeline.
*/
public NemoPipelineResult run(final Pipeline pipeline) {
- final PipelineVisitor pipelineVisitor = new PipelineVisitor();
+ final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
pipeline.traverseTopologically(pipelineVisitor);
- final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipeline,
- pipelineVisitor.getConvertedPipeline(),
- nemoPipelineOptions);
-
final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
- JobLauncher.launchDAG(dag);
+ JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline());
return nemoPipelineResult;
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
new file mode 100644
index 0000000..722f421
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
+import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
+import org.apache.nemo.compiler.frontend.beam.transform.*;
+
+import java.util.*;
+
+/**
+ * A collection of translators for the Beam PTransforms.
+ */
+
+final class PipelineTranslationContext {
+ private final PipelineOptions pipelineOptions;
+ private final DAGBuilder<IRVertex, IREdge> builder;
+ private final Map<PValue, TransformHierarchy.Node> pValueToProducerBeamNode;
+ private final Map<PValue, IRVertex> pValueToProducerVertex;
+ private final Map<PValue, TupleTag<?>> pValueToTag;
+ private final Stack<LoopVertex> loopVertexStack;
+ private final Pipeline pipeline;
+
+ /**
+ * @param pipeline the pipeline to translate
+ * @param pipelineOptions {@link PipelineOptions}
+ */
+ PipelineTranslationContext(final Pipeline pipeline,
+ final PipelineOptions pipelineOptions) {
+ this.pipeline = pipeline;
+ this.builder = new DAGBuilder<>();
+ this.pValueToProducerBeamNode = new HashMap<>();
+ this.pValueToProducerVertex = new HashMap<>();
+ this.pValueToTag = new HashMap<>();
+ this.loopVertexStack = new Stack<>();
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ void enterCompositeTransform(final TransformHierarchy.Node compositeTransform) {
+ if (compositeTransform.getTransform() instanceof LoopCompositeTransform) {
+ final LoopVertex loopVertex = new LoopVertex(compositeTransform.getFullName());
+ builder.addVertex(loopVertex, loopVertexStack);
+ builder.removeVertex(loopVertex);
+ loopVertexStack.push(new LoopVertex(compositeTransform.getFullName()));
+ }
+ }
+
+ void leaveCompositeTransform(final TransformHierarchy.Node compositeTransform) {
+ if (compositeTransform.getTransform() instanceof LoopCompositeTransform) {
+ loopVertexStack.pop();
+ }
+ }
+
+ /**
+ * Add IR vertex to the builder.
+ *
+ * @param vertex IR vertex to add
+ */
+ void addVertex(final IRVertex vertex) {
+ builder.addVertex(vertex, loopVertexStack);
+ }
+
+ /**
+ * Add IR edge to the builder.
+ *
+ * @param dst the destination IR vertex.
+ * @param input the {@link PValue} {@code dst} consumes
+ */
+ void addEdgeTo(final IRVertex dst, final PValue input) {
+ final Coder coder;
+ if (input instanceof PCollection) {
+ coder = ((PCollection) input).getCoder();
+ } else if (input instanceof PCollectionView) {
+ coder = getCoderForView((PCollectionView) input, this);
+ } else {
+ throw new RuntimeException(String.format("While adding an edge to %s, coder for PValue %s cannot "
+ + "be determined", dst, input));
+ }
+ addEdgeTo(dst, input, coder);
+ }
+
+ void addEdgeTo(final IRVertex dst, final PValue input, final Coder elementCoder) {
+ final IRVertex src = pValueToProducerVertex.get(input);
+ if (src == null) {
+ throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input));
+ }
+
+ final Coder windowCoder;
+ final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst);
+ final IREdge edge = new IREdge(communicationPattern, src, dst);
+
+ if (pValueToTag.containsKey(input)) {
+ edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
+ }
+ if (input instanceof PCollectionView) {
+ edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input));
+ }
+ if (input instanceof PCollection) {
+ windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
+ } else if (input instanceof PCollectionView) {
+ windowCoder = ((PCollectionView) input).getPCollection()
+ .getWindowingStrategy().getWindowFn().windowCoder();
+ } else {
+ throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
+ + "be determined", src, dst, input));
+ }
+
+ addEdgeTo(edge, elementCoder, windowCoder);
+ }
+
+ void addEdgeTo(final IREdge edge,
+ final Coder elementCoder,
+ final Coder windowCoder) {
+ edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
+
+ if (elementCoder instanceof KvCoder) {
+ Coder keyCoder = ((KvCoder) elementCoder).getKeyCoder();
+ edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
+ edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
+ }
+
+ edge.setProperty(EncoderProperty.of(
+ new BeamEncoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
+ edge.setProperty(DecoderProperty.of(
+ new BeamDecoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
+
+ builder.connectVertices(edge);
+ }
+
+ /**
+ * Registers a {@link PValue} as a m.forEach(outputFromGbk -> ain output from the specified {@link IRVertex}.
+ * @param node node
+ * @param irVertex the IR vertex
+ * @param output the {@link PValue} {@code irVertex} emits as main output
+ */
+ void registerMainOutputFrom(final TransformHierarchy.Node node,
+ final IRVertex irVertex,
+ final PValue output) {
+ pValueToProducerBeamNode.put(output, node);
+ pValueToProducerVertex.put(output, irVertex);
+ }
+
+ /**
+ * Registers a {@link PValue} as an additional output from the specified {@link IRVertex}.
+ *
+ * @param node node
+ * @param irVertex the IR vertex
+ * @param output the {@link PValue} {@code irVertex} emits as additional output
+ * @param tag the {@link TupleTag} associated with this additional output
+ */
+ void registerAdditionalOutputFrom(final TransformHierarchy.Node node,
+ final IRVertex irVertex,
+ final PValue output,
+ final TupleTag<?> tag) {
+ pValueToProducerBeamNode.put(output, node);
+ pValueToTag.put(output, tag);
+ pValueToProducerVertex.put(output, irVertex);
+ }
+
+ Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ DAGBuilder getBuilder() {
+ return builder;
+ }
+
+ TransformHierarchy.Node getProducerBeamNodeOf(final PValue pValue) {
+ return pValueToProducerBeamNode.get(pValue);
+ }
+
+ private CommunicationPatternProperty.Value getCommPattern(final IRVertex src, final IRVertex dst) {
+ final Class<?> constructUnionTableFn;
+ try {
+ constructUnionTableFn = Class.forName("org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn");
+ } catch (final ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ final Transform srcTransform = src instanceof OperatorVertex ? ((OperatorVertex) src).getTransform() : null;
+ final Transform dstTransform = dst instanceof OperatorVertex ? ((OperatorVertex) dst).getTransform() : null;
+ final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? ((DoFnTransform) srcTransform).getDoFn() : null;
+
+ if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
+ return CommunicationPatternProperty.Value.Shuffle;
+ }
+ if (srcTransform instanceof FlattenTransform) {
+ return CommunicationPatternProperty.Value.OneToOne;
+ }
+ if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+ || dstTransform instanceof GroupByKeyTransform) {
+ return CommunicationPatternProperty.Value.Shuffle;
+ }
+ if (dstTransform instanceof CreateViewTransform) {
+ return CommunicationPatternProperty.Value.BroadCast;
+ }
+ return CommunicationPatternProperty.Value.OneToOne;
+ }
+
+ /**
+ * Get appropriate coder for {@link PCollectionView}.
+ * @param view {@link PCollectionView}
+ * @return appropriate {@link Coder} for {@link PCollectionView}
+ */
+ private static Coder<?> getCoderForView(final PCollectionView view, final PipelineTranslationContext context) {
+ final TransformHierarchy.Node src = context.getProducerBeamNodeOf(view);
+ final KvCoder<?, ?> inputKVCoder = (KvCoder) src.getOutputs().values().stream()
+ .filter(v -> v instanceof PCollection)
+ .map(v -> (PCollection) v)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
+ .getCoder();
+ final ViewFn viewFn = view.getViewFn();
+ if (viewFn instanceof PCollectionViews.IterableViewFn) {
+ return IterableCoder.of(inputKVCoder.getValueCoder());
+ } else if (viewFn instanceof PCollectionViews.ListViewFn) {
+ return ListCoder.of(inputKVCoder.getValueCoder());
+ } else if (viewFn instanceof PCollectionViews.MapViewFn) {
+ return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
+ } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
+ return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
+ } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
+ return inputKVCoder;
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
+ }
+ }
+}
+
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 3fd9d2b..9118d98 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -24,25 +24,16 @@ import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.dag.DAGBuilder;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.PipelineVisitor.*;
-import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
-import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
import org.apache.nemo.compiler.frontend.beam.source.BeamBoundedSourceVertex;
import org.apache.nemo.compiler.frontend.beam.source.BeamUnboundedSourceVertex;
import org.apache.nemo.compiler.frontend.beam.transform.*;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -55,39 +46,19 @@ import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
- * Converts DAG of Beam root to Nemo IR DAG.
- * For a {@link PrimitiveTransformVertex}, it defines mapping to the corresponding {@link IRVertex}.
- * For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
- * before start translating inner Beam transform hierarchy.
+ * A collection of translators for the Beam PTransforms.
*/
-public final class PipelineTranslator {
-
+final class PipelineTranslator {
+ public static final PipelineTranslator INSTANCE = new PipelineTranslator();
private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
- private static final PipelineTranslator INSTANCE = new PipelineTranslator();
-
private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
/**
- * Static translator method.
- * @param pipeline the original root
- * @param root Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
- * @param pipelineOptions {@link PipelineOptions}
- * @return Nemo IR DAG
- */
- public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
- final CompositeTransformVertex root,
- final PipelineOptions pipelineOptions) {
- return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
- }
-
- /**
* Creates the translator, while building a map between {@link PTransform}s and the corresponding translators.
*/
private PipelineTranslator() {
@@ -98,7 +69,7 @@ public final class PipelineTranslator {
for (final Class<? extends PTransform> transform : primitive.value()) {
if (primitiveTransformToTranslator.containsKey(transform)) {
throw new RuntimeException(String.format("Translator for primitive transform %s is"
- + "already registered: %s", transform, primitiveTransformToTranslator.get(transform)));
+ + "already registered: %s", transform, primitiveTransformToTranslator.get(transform)));
}
primitiveTransformToTranslator.put(transform, translator);
}
@@ -107,7 +78,7 @@ public final class PipelineTranslator {
for (final Class<? extends PTransform> transform : composite.value()) {
if (compositeTransformToTranslator.containsKey(transform)) {
throw new RuntimeException(String.format("Translator for composite transform %s is"
- + "already registered: %s", transform, compositeTransformToTranslator.get(transform)));
+ + "already registered: %s", transform, compositeTransformToTranslator.get(transform)));
}
compositeTransformToTranslator.put(transform, translator);
}
@@ -115,138 +86,146 @@ public final class PipelineTranslator {
}
}
+ void translatePrimitive(final PipelineTranslationContext context,
+ final TransformHierarchy.Node primitive) {
+ final PTransform<?, ?> transform = primitive.getTransform();
+ Class<?> clazz = transform.getClass();
+ final Method translator = primitiveTransformToTranslator.get(clazz);
+ if (translator == null) {
+ throw new UnsupportedOperationException(
+ String.format("Primitive transform %s is not supported", transform.getClass().getCanonicalName()));
+ } else {
+ try {
+ translator.setAccessible(true);
+ translator.invoke(null, context, primitive, transform);
+ } catch (final IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (final InvocationTargetException | RuntimeException e) {
+ throw new RuntimeException(String.format(
+ "Translator %s have failed to translate %s", translator, transform), e);
+ }
+ }
+ }
+
+ /**
+ * @param context context.
+ * @param composite transform.
+ * @return behavior.
+ */
+ Pipeline.PipelineVisitor.CompositeBehavior translateComposite(final PipelineTranslationContext context,
+ final TransformHierarchy.Node composite) {
+ final PTransform<?, ?> transform = composite.getTransform();
+ if (transform == null) {
+ // root beam node
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ Class<?> clazz = transform.getClass();
+ final Method translator = compositeTransformToTranslator.get(clazz);
+ if (translator == null) {
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+ } else {
+ try {
+ translator.setAccessible(true);
+ return (Pipeline.PipelineVisitor.CompositeBehavior) translator.invoke(null, context, composite, transform);
+ } catch (final IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (final InvocationTargetException | RuntimeException e) {
+ throw new RuntimeException(String.format(
+ "Translator %s have failed to translate %s", translator, transform), e);
+ }
+ }
+ }
+
+ /**
+ * Annotates translator for PrimitiveTransform.
+ */
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface PrimitiveTransformTranslator {
+ Class<? extends PTransform>[] value();
+ }
+
+ /**
+ * Annotates translator for CompositeTransform.
+ */
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface CompositeTransformTranslator {
+ Class<? extends PTransform>[] value();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /////////////////////// PRIMITIVE TRANSFORMS
+
@PrimitiveTransformTranslator(Read.Unbounded.class)
- private static void unboundedReadTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void unboundedReadTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final Read.Unbounded<?> transform) {
final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource());
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(Read.Bounded.class)
- private static void boundedReadTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void boundedReadTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final Read.Bounded<?> transform) {
final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource());
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
- }
-
- private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex) {
- try {
- final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
- final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
- final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
- final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
- final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
-
- final PCollection<?> mainInput = (PCollection<?>)
- Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
-
- return new DoFnTransform(
- doFn,
- mainInput.getCoder(),
- getOutputCoders(pTransform),
- mainOutputTag,
- additionalOutputTags.getAll(),
- mainInput.getWindowingStrategy(),
- sideInputs,
- ctx.pipelineOptions);
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(ParDo.SingleOutput.class)
- private static void parDoSingleOutputTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void parDoSingleOutputTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final ParDo.SingleOutput<?, ?> transform) {
- final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+ final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
final IRVertex vertex = new OperatorVertex(doFnTransform);
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().stream()
+ beamNode.getInputs().values().stream()
.filter(input -> !transform.getAdditionalInputs().values().contains(input))
.forEach(input -> ctx.addEdgeTo(vertex, input));
transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
- }
-
- private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final AppliedPTransform<?, ?, ?> ptransform) {
- return ptransform
- .getOutputs()
- .entrySet()
- .stream()
- .filter(e -> e.getValue() instanceof PCollection)
- .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(ParDo.MultiOutput.class)
- private static void parDoMultiOutputTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void parDoMultiOutputTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final ParDo.MultiOutput<?, ?> transform) {
- final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+ final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
final IRVertex vertex = new OperatorVertex(doFnTransform);
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().stream()
+ beamNode.getInputs().values().stream()
.filter(input -> !transform.getAdditionalInputs().values().contains(input))
.forEach(input -> ctx.addEdgeTo(vertex, input));
transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().entrySet().stream()
+ beamNode.getOutputs().entrySet().stream()
.filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
- .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
- transformVertex.getNode().getOutputs().entrySet().stream()
+ .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(beamNode, vertex, pValueWithTupleTag.getValue()));
+ beamNode.getOutputs().entrySet().stream()
.filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
- .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
+ .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(beamNode, vertex, pValueWithTupleTag.getValue(),
pValueWithTupleTag.getKey()));
}
- /**
- * Create a group by key transform.
- * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
- * @param ctx translation context
- * @param transformVertex transform vertex
- * @return group by key transform
- */
- private static Transform createGBKTransform(
- final TranslationContext ctx,
- final TransformVertex transformVertex) {
- final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
- final PCollection<?> mainInput = (PCollection<?>)
- Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
- final TupleTag mainOutputTag = new TupleTag<>();
-
- if (mainInput.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
- return new GroupByKeyTransform();
- } else {
- return new GroupByKeyAndWindowDoFnTransform(
- getOutputCoders(pTransform),
- mainOutputTag,
- Collections.emptyList(), /* GBK does not have additional outputs */
- mainInput.getWindowingStrategy(),
- Collections.emptyList(), /* GBK does not have additional side inputs */
- ctx.pipelineOptions,
- SystemReduceFn.buffering(mainInput.getCoder()));
- }
- }
-
@PrimitiveTransformTranslator(GroupByKey.class)
- private static void groupByKeyTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void groupByKeyTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final GroupByKey<?, ?> transform) {
- final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
+ final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, beamNode));
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator({Window.class, Window.Assign.class})
- private static void windowTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void windowTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final PTransform<?, ?> transform) {
final WindowFn windowFn;
if (transform instanceof Window) {
@@ -258,429 +237,139 @@ public final class PipelineTranslator {
}
final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(View.CreatePCollectionView.class)
- private static void createPCollectionViewTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void createPCollectionViewTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final View.CreatePCollectionView<?, ?> transform) {
final IRVertex vertex = new OperatorVertex(new CreateViewTransform(transform.getView().getViewFn()));
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- ctx.registerMainOutputFrom(vertex, transform.getView());
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ ctx.registerMainOutputFrom(beamNode, vertex, transform.getView());
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(Flatten.PCollections.class)
- private static void flattenTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex transformVertex,
+ private static void flattenTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final Flatten.PCollections<?> transform) {
final IRVertex vertex = new OperatorVertex(new FlattenTransform());
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
- /**
- * Default translator for CompositeTransforms. Translates inner DAG without modifying {@link TranslationContext}.
- *
- * @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code transformVertex}
- */
- @CompositeTransformTranslator(PTransform.class)
- private static void topologicalTranslator(final TranslationContext ctx,
- final CompositeTransformVertex transformVertex,
- final PTransform<?, ?> transform) {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /////////////////////// COMPOSITE TRANSFORMS
/**
- * Translator for Combine transform. Implements local combining before shuffling key-value pairs.
+ * {@link Combine.PerKey} = {@link GroupByKey} + {@link Combine.GroupedValues}
+ * ({@link Combine.Globally} internally uses {@link Combine.PerKey} which will also be optimized by this translator)
+ * Here, we translate this composite transform as a whole, exploiting its accumulator semantics.
*
* @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code transformVertex}
+ * @param beamNode the given CompositeTransform to translate
+ * @param transform transform which can be obtained from {@code beamNode}
*/
- @CompositeTransformTranslator({Combine.Globally.class, Combine.PerKey.class, Combine.GroupedValues.class})
- private static void combineTranslator(final TranslationContext ctx,
- final CompositeTransformVertex transformVertex,
- final PTransform<?, ?> transform) {
- // No optimization for BeamSQL that handles Beam 'Row's.
- final boolean handlesBeamRow = Stream
- .concat(transformVertex.getNode().getInputs().values().stream(),
- transformVertex.getNode().getOutputs().values().stream())
- .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
- .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
- .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
- if (handlesBeamRow) {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- return; // return early and give up optimization - TODO #209: Enable Local Combiner for BeamSQL
- }
-
- // Local combiner optimization
- final List<TransformVertex> topologicalOrdering = transformVertex.getDAG().getTopologicalSort();
- final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
- final TransformVertex last = topologicalOrdering.get(topologicalOrdering.size() - 1);
- if (groupByKeyBeamTransform.getNode().getTransform() instanceof GroupByKey) {
- // Translate the given CompositeTransform under OneToOneEdge-enforced context.
- final TranslationContext oneToOneEdgeContext = new TranslationContext(ctx,
- OneToOneCommunicationPatternSelector.INSTANCE);
- transformVertex.getDAG().topologicalDo(oneToOneEdgeContext::translate);
-
- // Attempt to translate the CompositeTransform again.
- // Add GroupByKey, which is the first transform in the given CompositeTransform.
- // Make sure it consumes the output from the last vertex in OneToOneEdge-translated hierarchy.
- final IRVertex groupByKeyIRVertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
- ctx.addVertex(groupByKeyIRVertex);
- last.getNode().getOutputs().values().forEach(outputFromCombiner
- -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
- groupByKeyBeamTransform.getNode().getOutputs().values()
- .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
-
- // Translate the remaining vertices.
- topologicalOrdering.stream().skip(1).forEach(ctx::translate);
- } else {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- }
+ @CompositeTransformTranslator(Combine.PerKey.class)
+ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslator(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
+ final PTransform<?, ?> transform) {
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
}
/**
- * Pushes the loop vertex to the stack before translating the inner DAG, and pops it after the translation.
- *
* @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code transformVertex}
+ * @param beamNode the given CompositeTransform to translate
+ * @param transform transform which can be obtained from {@code beamNode}
+ * @
*/
@CompositeTransformTranslator(LoopCompositeTransform.class)
- private static void loopTranslator(final TranslationContext ctx,
- final CompositeTransformVertex transformVertex,
- final LoopCompositeTransform<?, ?> transform) {
- final LoopVertex loopVertex = new LoopVertex(transformVertex.getNode().getFullName());
- ctx.builder.addVertex(loopVertex, ctx.loopVertexStack);
- ctx.builder.removeVertex(loopVertex);
- ctx.loopVertexStack.push(loopVertex);
- topologicalTranslator(ctx, transformVertex, transform);
- ctx.loopVertexStack.pop();
+ private static Pipeline.PipelineVisitor.CompositeBehavior loopTranslator(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
+ final LoopCompositeTransform<?, ?> transform) {
+ // Do nothing here, as the context handles the loop vertex stack.
+ // We just keep this method to signal that the loop vertex is acknowledged.
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
}
- private DAG<IRVertex, IREdge> translateToIRDAG(final CompositeTransformVertex vertex,
- final Pipeline pipeline,
- final PipelineOptions pipelineOptions) {
- final TranslationContext ctx = new TranslationContext(vertex, pipeline, primitiveTransformToTranslator,
- compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
- ctx.translate(vertex);
- return ctx.builder.build();
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /////////////////////// HELPER METHODS
- /**
- * Annotates translator for PrimitiveTransform.
- */
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- private @interface PrimitiveTransformTranslator {
- Class<? extends PTransform>[] value();
- }
-
- /**
- * Annotates translator for CompositeTransform.
- */
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- private @interface CompositeTransformTranslator {
- Class<? extends PTransform>[] value();
- }
+ private static DoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode) {
+ try {
+ final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+ final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+ final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
+ final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
+ final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
- private static Coder<?> getCoder(final PValue input, final CompositeTransformVertex pipeline) {
- final Coder<?> coder;
- if (input instanceof PCollection) {
- coder = ((PCollection) input).getCoder();
- } else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, pipeline);
- } else {
- throw new RuntimeException(String.format("Coder for PValue %s cannot be determined", input));
- }
- return coder;
- }
+ final PCollection<?> mainInput = (PCollection<?>)
+ Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
- /**
- * Get appropriate coder for {@link PCollectionView}.
- *
- * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
- * @return appropriate {@link Coder} for {@link PCollectionView}
- */
- private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
- final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
- final Coder<?> baseCoder = src.getNode().getOutputs().values().stream()
- .filter(v -> v instanceof PCollection)
- .map(v -> (PCollection) v)
- .findFirst()
- .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
- .getCoder();
- final KvCoder<?, ?> inputKVCoder = (KvCoder) baseCoder;
- final ViewFn viewFn = view.getViewFn();
- if (viewFn instanceof PCollectionViews.IterableViewFn) {
- return IterableCoder.of(inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.ListViewFn) {
- return ListCoder.of(inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.MapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
- } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
- return baseCoder;
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
+ return new DoFnTransform(
+ doFn,
+ mainInput.getCoder(),
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ additionalOutputTags.getAll(),
+ mainInput.getWindowingStrategy(),
+ sideInputs,
+ ctx.getPipelineOptions());
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
}
}
- /**
- * Translation context.
- */
- private static final class TranslationContext {
- private final CompositeTransformVertex root;
- private final PipelineOptions pipelineOptions;
- private final DAGBuilder<IRVertex, IREdge> builder;
- private final Map<PValue, IRVertex> pValueToProducer;
- private final Map<PValue, TupleTag<?>> pValueToTag;
- private final Stack<LoopVertex> loopVertexStack;
- private final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> communicationPatternSelector;
- private final Pipeline pipeline;
-
-
- private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator;
- private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
-
- /**
- * @param root the root to translate
- * @param pipeline the pipeline to translate
- * @param primitiveTransformToTranslator provides translators for PrimitiveTransform
- * @param compositeTransformToTranslator provides translators for CompositeTransform
- * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
- * @param pipelineOptions {@link PipelineOptions}
- */
- private TranslationContext(final CompositeTransformVertex root,
- final Pipeline pipeline,
- final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
- final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
- final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
- final PipelineOptions pipelineOptions) {
- this.root = root;
- this.pipeline = pipeline;
- this.builder = new DAGBuilder<>();
- this.pValueToProducer = new HashMap<>();
- this.pValueToTag = new HashMap<>();
- this.loopVertexStack = new Stack<>();
- this.primitiveTransformToTranslator = primitiveTransformToTranslator;
- this.compositeTransformToTranslator = compositeTransformToTranslator;
- this.communicationPatternSelector = selector;
- this.pipelineOptions = pipelineOptions;
- }
-
- /**
- * Copy constructor, except for setting different CommunicationPatternProperty selector.
- *
- * @param ctx the original {@link TranslationContext}
- * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
- */
- private TranslationContext(final TranslationContext ctx,
- final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
- this.root = ctx.root;
- this.pipeline = ctx.pipeline;
- this.pipelineOptions = ctx.pipelineOptions;
- this.builder = ctx.builder;
- this.pValueToProducer = ctx.pValueToProducer;
- this.pValueToTag = ctx.pValueToTag;
- this.loopVertexStack = ctx.loopVertexStack;
- this.primitiveTransformToTranslator = ctx.primitiveTransformToTranslator;
- this.compositeTransformToTranslator = ctx.compositeTransformToTranslator;
-
- this.communicationPatternSelector = selector;
- }
-
- /**
- * Selects appropriate translator to translate the given hierarchy.
- *
- * @param transformVertex the Beam transform hierarchy to translate
- */
- private void translate(final TransformVertex transformVertex) {
- final boolean isComposite = transformVertex instanceof CompositeTransformVertex;
- final PTransform<?, ?> transform = transformVertex.getNode().getTransform();
- if (transform == null) {
- // root node
- topologicalTranslator(this, (CompositeTransformVertex) transformVertex, null);
- return;
- }
-
- Class<?> clazz = transform.getClass();
- while (true) {
- final Method translator = (isComposite ? compositeTransformToTranslator : primitiveTransformToTranslator)
- .get(clazz);
- if (translator == null) {
- if (clazz.getSuperclass() != null) {
- clazz = clazz.getSuperclass();
- continue;
- }
- throw new UnsupportedOperationException(String.format("%s transform %s is not supported",
- isComposite ? "Composite" : "Primitive", transform.getClass().getCanonicalName()));
- } else {
- try {
- translator.setAccessible(true);
- translator.invoke(null, this, transformVertex, transform);
- break;
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (final InvocationTargetException | RuntimeException e) {
- throw new RuntimeException(String.format(
- "Translator %s have failed to translate %s", translator, transform), e);
- }
- }
- }
- }
-
- /**
- * Add IR vertex to the builder.
- *
- * @param vertex IR vertex to add
- */
- private void addVertex(final IRVertex vertex) {
- builder.addVertex(vertex, loopVertexStack);
- }
-
- /**
- * Add IR edge to the builder.
- *
- * @param dst the destination IR vertex.
- * @param input the {@link PValue} {@code dst} consumes
- */
- private void addEdgeTo(final IRVertex dst, final PValue input) {
- final IRVertex src = pValueToProducer.get(input);
- if (src == null) {
- try {
- throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
- + "while PTransform %s is known to produce it.", input, root.getPrimitiveProducerOf(input)));
- } catch (final RuntimeException e) {
- throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
- + "and the corresponding PTransform was not found", input));
- }
- }
- final CommunicationPatternProperty.Value communicationPattern = communicationPatternSelector.apply(src, dst);
- if (communicationPattern == null) {
- throw new RuntimeException(String.format("%s have failed to determine communication pattern "
- + "for an edge from %s to %s", communicationPatternSelector, src, dst));
- }
- final IREdge edge = new IREdge(communicationPattern, src, dst);
- final Coder coder;
- final Coder windowCoder;
- if (input instanceof PCollection) {
- coder = ((PCollection) input).getCoder();
- windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
- } else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, root);
- windowCoder = ((PCollectionView) input).getPCollection()
- .getWindowingStrategy().getWindowFn().windowCoder();
- } else {
- throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
- + "be determined", src, dst, input));
- }
-
- edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
- if (coder instanceof KvCoder) {
- Coder keyCoder = ((KvCoder) coder).getKeyCoder();
- edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
- edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
- }
-
- edge.setProperty(EncoderProperty.of(
- new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
- edge.setProperty(DecoderProperty.of(
- new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
-
- if (pValueToTag.containsKey(input)) {
- edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
- }
-
- if (input instanceof PCollectionView) {
- edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input));
- }
-
- builder.connectVertices(edge);
- }
-
- /**
- * Registers a {@link PValue} as a main output from the specified {@link IRVertex}.
- *
- * @param irVertex the IR vertex
- * @param output the {@link PValue} {@code irVertex} emits as main output
- */
- private void registerMainOutputFrom(final IRVertex irVertex, final PValue output) {
- pValueToProducer.put(output, irVertex);
- }
-
- /**
- * Registers a {@link PValue} as an additional output from the specified {@link IRVertex}.
- *
- * @param irVertex the IR vertex
- * @param output the {@link PValue} {@code irVertex} emits as additional output
- * @param tag the {@link TupleTag} associated with this additional output
- */
- private void registerAdditionalOutputFrom(final IRVertex irVertex, final PValue output, final TupleTag<?> tag) {
- pValueToTag.put(output, tag);
- pValueToProducer.put(output, irVertex);
- }
+ private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final AppliedPTransform<?, ?, ?> ptransform) {
+ return ptransform
+ .getOutputs()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() instanceof PCollection)
+ .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
}
/**
- * Default implementation for {@link CommunicationPatternProperty.Value} selector.
+ * Create a group by key transform.
+ * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
+ * @param ctx translation context
+ * @param beamNode transform vertex
+ * @return group by key transform
*/
- private static final class DefaultCommunicationPatternSelector
- implements BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> {
-
- private static final DefaultCommunicationPatternSelector INSTANCE = new DefaultCommunicationPatternSelector();
-
- @Override
- public CommunicationPatternProperty.Value apply(final IRVertex src, final IRVertex dst) {
- final Class<?> constructUnionTableFn;
- try {
- constructUnionTableFn = Class.forName("org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn");
- } catch (final ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- final Transform srcTransform = src instanceof OperatorVertex ? ((OperatorVertex) src).getTransform() : null;
- final Transform dstTransform = dst instanceof OperatorVertex ? ((OperatorVertex) dst).getTransform() : null;
- final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? ((DoFnTransform) srcTransform).getDoFn() : null;
+ private static Transform createGBKTransform(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode) {
+ final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+ final PCollection<?> mainInput = (PCollection<?>)
+ Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+ final TupleTag mainOutputTag = new TupleTag<>();
- if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
- return CommunicationPatternProperty.Value.Shuffle;
- }
- if (srcTransform instanceof FlattenTransform) {
- return CommunicationPatternProperty.Value.OneToOne;
- }
- if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
- || dstTransform instanceof GroupByKeyTransform) {
- return CommunicationPatternProperty.Value.Shuffle;
- }
- if (dstTransform instanceof CreateViewTransform) {
- return CommunicationPatternProperty.Value.BroadCast;
- }
- return CommunicationPatternProperty.Value.OneToOne;
+ if (isGlobalWindow(beamNode, ctx.getPipeline())) {
+ return new GroupByKeyTransform();
+ } else {
+ return new GroupByKeyAndWindowDoFnTransform(
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ Collections.emptyList(), /* GBK does not have additional outputs */
+ mainInput.getWindowingStrategy(),
+ Collections.emptyList(), /* GBK does not have additional side inputs */
+ ctx.getPipelineOptions(),
+ SystemReduceFn.buffering(mainInput.getCoder()));
}
}
- /**
- * A {@link CommunicationPatternProperty.Value} selector which always emits OneToOne.
- */
- private static final class OneToOneCommunicationPatternSelector
- implements BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> {
- private static final OneToOneCommunicationPatternSelector INSTANCE = new OneToOneCommunicationPatternSelector();
-
- @Override
- public CommunicationPatternProperty.Value apply(final IRVertex src, final IRVertex dst) {
- return CommunicationPatternProperty.Value.OneToOne;
- }
+ private static boolean isGlobalWindow(final TransformHierarchy.Node beamNode, final Pipeline pipeline) {
+ final AppliedPTransform pTransform = beamNode.toAppliedPTransform(pipeline);
+ final PCollection<?> mainInput = (PCollection<?>)
+ Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+ return mainInput.getWindowingStrategy().getWindowFn() instanceof GlobalWindows;
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
index c1723da..e80db2d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
@@ -18,282 +18,45 @@
*/
package org.apache.nemo.compiler.frontend.beam;
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.dag.DAGBuilder;
-import org.apache.nemo.common.dag.Edge;
-import org.apache.nemo.common.dag.Vertex;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PValue;
-
-import java.util.*;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.vertex.IRVertex;
/**
- * Traverses through the given Beam pipeline to construct a DAG of Beam Transform,
- * while preserving hierarchy of CompositeTransforms.
- * Hierarchy is established when a CompositeTransform is expanded to other CompositeTransforms or PrimitiveTransforms,
- * as the former CompositeTransform becoming 'enclosingVertex' which have the inner transforms as embedded DAG.
- * This DAG will be later translated by {@link PipelineTranslator} into Nemo IR DAG.
+ * Uses the translator and the context to build a Nemo IR DAG.
+ * - Translator: Translates each PTransform, and lets us know whether or not to enter into a composite PTransform.
+ * - Context: The translator builds a DAG in the context.
*/
public final class PipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
+ private static PipelineTranslator pipelineTranslator = PipelineTranslator.INSTANCE;
+ private final PipelineTranslationContext context;
- private static final String TRANSFORM = "Transform-";
- private static final String DATAFLOW = "Dataflow-";
-
- private final Stack<CompositeTransformVertex> compositeTransformVertexStack = new Stack<>();
- private CompositeTransformVertex rootVertex = null;
- private int nextIdx = 0;
+ PipelineVisitor(final Pipeline pipeline, final NemoPipelineOptions pipelineOptions) {
+ this.context = new PipelineTranslationContext(pipeline, pipelineOptions);
+ }
@Override
public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
- final PrimitiveTransformVertex vertex = new PrimitiveTransformVertex(node, compositeTransformVertexStack.peek());
- compositeTransformVertexStack.peek().addVertex(vertex);
- vertex.getPValuesConsumed()
- .forEach(pValue -> {
- final TransformVertex dst = getDestinationOfDataFlowEdge(vertex, pValue);
- dst.enclosingVertex.addDataFlow(new DataFlowEdge(dst.enclosingVertex.getProducerOf(pValue), dst));
- });
+ pipelineTranslator.translatePrimitive(context, node);
}
@Override
public CompositeBehavior enterCompositeTransform(final TransformHierarchy.Node node) {
- final CompositeTransformVertex vertex;
- if (compositeTransformVertexStack.isEmpty()) {
- // There is always a top-level CompositeTransform that encompasses the entire Beam pipeline.
- vertex = new CompositeTransformVertex(node, null);
- } else {
- vertex = new CompositeTransformVertex(node, compositeTransformVertexStack.peek());
- }
- compositeTransformVertexStack.push(vertex);
- return CompositeBehavior.ENTER_TRANSFORM;
+ final CompositeBehavior compositeBehavior = pipelineTranslator.translateComposite(context, node);
+
+ // this should come after the above translateComposite, since this composite is a child of a previous composite.
+ context.enterCompositeTransform(node);
+ return compositeBehavior;
}
@Override
public void leaveCompositeTransform(final TransformHierarchy.Node node) {
- final CompositeTransformVertex vertex = compositeTransformVertexStack.pop();
- vertex.build();
- if (compositeTransformVertexStack.isEmpty()) {
- // The vertex is the root.
- if (rootVertex != null) {
- throw new RuntimeException("The visitor already have traversed a Beam pipeline. "
- + "Re-using a visitor is not allowed.");
- }
- rootVertex = vertex;
- } else {
- // The CompositeTransformVertex is ready; adding it to its enclosing vertex.
- compositeTransformVertexStack.peek().addVertex(vertex);
- }
- }
-
- /**
- * @return A vertex representing the top-level CompositeTransform.
- */
- public CompositeTransformVertex getConvertedPipeline() {
- if (rootVertex == null) {
- throw new RuntimeException("The visitor have not fully traversed through a Beam pipeline.");
- }
- return rootVertex;
- }
-
- /**
- * Represents a {@link org.apache.beam.sdk.transforms.PTransform} as a vertex in DAG.
- */
- public abstract class TransformVertex extends Vertex {
- private final TransformHierarchy.Node node;
- private final CompositeTransformVertex enclosingVertex;
-
- /**
- * @param node the corresponding Beam node
- * @param enclosingVertex the vertex for the transform which inserted this transform as its expansion,
- * or {@code null}
- */
- private TransformVertex(final TransformHierarchy.Node node, final CompositeTransformVertex enclosingVertex) {
- super(String.format("%s%d", TRANSFORM, nextIdx++));
- this.node = node;
- this.enclosingVertex = enclosingVertex;
- }
-
- /**
- * @return Collection of {@link PValue}s this transform emits.
- */
- public abstract Collection<PValue> getPValuesProduced();
-
- /**
- * Searches within {@code this} to find a transform that produces the given {@link PValue}.
- *
- * @param pValue a {@link PValue}
- * @return the {@link TransformVertex} whose {@link org.apache.beam.sdk.transforms.PTransform}
- * produces the given {@code pValue}
- */
- public abstract PrimitiveTransformVertex getPrimitiveProducerOf(final PValue pValue);
-
- /**
- * @return the corresponding Beam node.
- */
- public TransformHierarchy.Node getNode() {
- return node;
- }
-
- /**
- * @return the enclosing {@link CompositeTransformVertex} if any, {@code null} otherwise.
- */
- public CompositeTransformVertex getEnclosingVertex() {
- return enclosingVertex;
- }
- }
-
- /**
- * Represents a transform hierarchy for primitive transform.
- */
- public final class PrimitiveTransformVertex extends TransformVertex {
- private final List<PValue> pValuesProduced = new ArrayList<>();
- private final List<PValue> pValuesConsumed = new ArrayList<>();
-
- private PrimitiveTransformVertex(final TransformHierarchy.Node node,
- final CompositeTransformVertex enclosingVertex) {
- super(node, enclosingVertex);
- if (node.getTransform() instanceof View.CreatePCollectionView) {
- pValuesProduced.add(((View.CreatePCollectionView) node.getTransform()).getView());
- }
- if (node.getTransform() instanceof ParDo.SingleOutput) {
- pValuesConsumed.addAll(((ParDo.SingleOutput) node.getTransform()).getSideInputs());
- }
- if (node.getTransform() instanceof ParDo.MultiOutput) {
- pValuesConsumed.addAll(((ParDo.MultiOutput) node.getTransform()).getSideInputs());
- }
- pValuesProduced.addAll(getNode().getOutputs().values());
- pValuesConsumed.addAll(getNode().getInputs().values());
- }
-
- @Override
- public Collection<PValue> getPValuesProduced() {
- return pValuesProduced;
- }
-
- @Override
- public PrimitiveTransformVertex getPrimitiveProducerOf(final PValue pValue) {
- if (!getPValuesProduced().contains(pValue)) {
- throw new RuntimeException();
- }
- return this;
- }
-
- /**
- * @return collection of {@link PValue} this transform consumes.
- */
- public Collection<PValue> getPValuesConsumed() {
- return pValuesConsumed;
- }
- }
- /**
- * Represents a transform hierarchy for composite transform.
- */
- public final class CompositeTransformVertex extends TransformVertex {
- private final Map<PValue, TransformVertex> pValueToProducer = new HashMap<>();
- private final Collection<DataFlowEdge> dataFlowEdges = new ArrayList<>();
- private final DAGBuilder<TransformVertex, DataFlowEdge> builder = new DAGBuilder<>();
- private DAG<TransformVertex, DataFlowEdge> dag = null;
-
- private CompositeTransformVertex(final TransformHierarchy.Node node,
- final CompositeTransformVertex enclosingVertex) {
- super(node, enclosingVertex);
- }
-
- /**
- * Finalize this vertex and make it ready to be added to another {@link CompositeTransformVertex}.
- */
- private void build() {
- if (dag != null) {
- throw new RuntimeException("DAG already have been built.");
- }
- dataFlowEdges.forEach(builder::connectVertices);
- dag = builder.build();
- }
-
- /**
- * Add a {@link TransformVertex}.
- *
- * @param vertex the vertex to add
- */
- private void addVertex(final TransformVertex vertex) {
- vertex.getPValuesProduced().forEach(value -> pValueToProducer.put(value, vertex));
- builder.addVertex(vertex);
- }
-
- /**
- * Add a {@link DataFlowEdge}.
- *
- * @param dataFlowEdge the edge to add
- */
- private void addDataFlow(final DataFlowEdge dataFlowEdge) {
- dataFlowEdges.add(dataFlowEdge);
- }
-
- @Override
- public Collection<PValue> getPValuesProduced() {
- return pValueToProducer.keySet();
- }
-
- /**
- * Get a direct child of this vertex which produces the given {@link PValue}.
- *
- * @param pValue the {@link PValue} to search
- * @return the direct child of this vertex which produces {@code pValue}
- */
- public TransformVertex getProducerOf(final PValue pValue) {
- final TransformVertex vertex = pValueToProducer.get(pValue);
- if (vertex == null) {
- throw new RuntimeException();
- }
- return vertex;
- }
-
- @Override
- public PrimitiveTransformVertex getPrimitiveProducerOf(final PValue pValue) {
- return getProducerOf(pValue).getPrimitiveProducerOf(pValue);
- }
-
- /**
- * @return DAG of Beam hierarchy
- */
- public DAG<TransformVertex, DataFlowEdge> getDAG() {
- return dag;
- }
- }
-
- /**
- * Represents data flow from a transform to another transform.
- */
- public final class DataFlowEdge extends Edge<TransformVertex> {
- /**
- * @param src source vertex
- * @param dst destination vertex
- */
- private DataFlowEdge(final TransformVertex src, final TransformVertex dst) {
- super(String.format("%s%d", DATAFLOW, nextIdx++), src, dst);
- }
+ context.leaveCompositeTransform(node);
}
- /**
- * @param primitiveConsumer a {@link PrimitiveTransformVertex} which consumes {@code pValue}
- * @param pValue the specified {@link PValue}
- * @return the closest {@link TransformVertex} to {@code primitiveConsumer},
- * which is equal to or encloses {@code primitiveConsumer} and can be the destination vertex of
- * data flow edge from the producer of {@code pValue} to {@code primitiveConsumer}.
- */
- private TransformVertex getDestinationOfDataFlowEdge(final PrimitiveTransformVertex primitiveConsumer,
- final PValue pValue) {
- TransformVertex current = primitiveConsumer;
- while (true) {
- if (current.getEnclosingVertex().getPValuesProduced().contains(pValue)) {
- return current;
- }
- current = current.getEnclosingVertex();
- if (current.getEnclosingVertex() == null) {
- throw new RuntimeException(String.format("Cannot find producer of %s", pValue));
- }
- }
+ DAG<IRVertex, IREdge> getConvertedPipeline() {
+ return context.getBuilder().build();
}
}
diff --git a/compiler/pom.xml b/compiler/pom.xml
index ed5340e..32471bc 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -33,6 +33,19 @@ under the License.
<packaging>pom</packaging>
<name>Nemo Compiler</name>
+ <dependencies>
+ <dependency>
+ <!--
+ This is needed to view the logs when running unit tests.
+ See https://dzone.com/articles/how-configure-slf4j-different for details.
+ -->
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<modules>
<module>backend</module>
<module>frontend/beam</module>
diff --git a/compiler/test/pom.xml b/compiler/test/pom.xml
index daf5823..4de5ca5 100644
--- a/compiler/test/pom.xml
+++ b/compiler/test/pom.xml
@@ -75,5 +75,15 @@ under the License.
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
</dependency>
+ <dependency>
+ <!--
+ This is needed to view the logs when running unit tests.
+ See https://dzone.com/articles/how-configure-slf4j-different for details.
+ -->
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index e5aad3a..fde90a9 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -36,27 +36,23 @@ import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class BeamFrontendALSTest {
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testALSDAG() throws Exception {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
- assertEquals(42, producedDAG.getVertices().size());
+ assertEquals(38, producedDAG.getVertices().size());
// producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId()));
- final IRVertex vertex11 = producedDAG.getTopologicalSort().get(5);
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex11).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex11.getId()).size());
- assertEquals(4, producedDAG.getOutgoingEdgesOf(vertex11).size());
+ final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+ assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
- final IRVertex vertex17 = producedDAG.getTopologicalSort().get(10);
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex17).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex17.getId()).size());
- assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex17).size());
-
- final IRVertex vertex18 = producedDAG.getTopologicalSort().get(16);
- assertEquals(2, producedDAG.getIncomingEdgesOf(vertex18).size());
- assertEquals(2, producedDAG.getIncomingEdgesOf(vertex18.getId()).size());
- assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex18).size());
+ final IRVertex vertexY = producedDAG.getTopologicalSort().get(10);
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+ assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index f50b331..d52d13c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -36,26 +36,22 @@ import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public class BeamFrontendMLRTest {
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testMLRDAG() throws Exception {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
- assertEquals(42, producedDAG.getVertices().size());
+ assertEquals(36, producedDAG.getVertices().size());
- final IRVertex vertex1 = producedDAG.getTopologicalSort().get(5);
- assertEquals(0, producedDAG.getIncomingEdgesOf(vertex1).size());
- assertEquals(0, producedDAG.getIncomingEdgesOf(vertex1.getId()).size());
- assertEquals(3, producedDAG.getOutgoingEdgesOf(vertex1).size());
+ final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+ assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
- final IRVertex vertex15 = producedDAG.getTopologicalSort().get(13);
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex15).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertex15.getId()).size());
- assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex15).size());
-
- final IRVertex vertex21 = producedDAG.getTopologicalSort().get(19);
- assertEquals(2, producedDAG.getIncomingEdgesOf(vertex21).size());
- assertEquals(2, producedDAG.getIncomingEdgesOf(vertex21.getId()).size());
- assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex21).size());
+ final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
+ assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+ assertEquals(2, producedDAG.getOutgoingEdgesOf(vertexY).size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 8ec6511..8b23349 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -49,46 +49,19 @@ public class TransientResourceCompositePassTest {
compiledDAG = CompilerTestUtil.compileALSDAG();
}
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testTransientResourcePass() throws Exception {
final DAG<IRVertex, IREdge> processedDAG = new TransientResourceCompositePass().apply(compiledDAG);
- final IRVertex vertex1 = processedDAG.getTopologicalSort().get(0);
- assertEquals(ResourcePriorityProperty.TRANSIENT, vertex1.getPropertyValue(ResourcePriorityProperty.class).get());
+ final IRVertex vertexX = processedDAG.getTopologicalSort().get(0);
+ assertEquals(ResourcePriorityProperty.TRANSIENT, vertexX.getPropertyValue(ResourcePriorityProperty.class).get());
- final IRVertex vertex2 = processedDAG.getTopologicalSort().get(11);
- assertEquals(ResourcePriorityProperty.TRANSIENT, vertex2.getPropertyValue(ResourcePriorityProperty.class).get());
- processedDAG.getIncomingEdgesOf(vertex2).forEach(irEdge -> {
+ final IRVertex vertexY = processedDAG.getTopologicalSort().get(5);
+ assertEquals(ResourcePriorityProperty.TRANSIENT, vertexY.getPropertyValue(ResourcePriorityProperty.class).get());
+ processedDAG.getIncomingEdgesOf(vertexY).forEach(irEdge -> {
assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
assertEquals(DataFlowProperty.Value.Pull, irEdge.getPropertyValue(DataFlowProperty.class).get());
});
-
- final IRVertex vertex5 = processedDAG.getTopologicalSort().get(14);
- assertEquals(ResourcePriorityProperty.RESERVED, vertex5.getPropertyValue(ResourcePriorityProperty.class).get());
- processedDAG.getIncomingEdgesOf(vertex5).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
- assertEquals(DataFlowProperty.Value.Push, irEdge.getPropertyValue(DataFlowProperty.class).get());
- });
-
- final IRVertex vertex11 = processedDAG.getTopologicalSort().get(5);
- assertEquals(ResourcePriorityProperty.RESERVED, vertex11.getPropertyValue(ResourcePriorityProperty.class).get());
- processedDAG.getIncomingEdgesOf(vertex11).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
- assertEquals(DataFlowProperty.Value.Pull, irEdge.getPropertyValue(DataFlowProperty.class).get());
- });
-
- final IRVertex vertex17 = processedDAG.getTopologicalSort().get(10);
- assertEquals(ResourcePriorityProperty.TRANSIENT, vertex17.getPropertyValue(ResourcePriorityProperty.class).get());
- processedDAG.getIncomingEdgesOf(vertex17).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
- assertEquals(DataFlowProperty.Value.Pull, irEdge.getPropertyValue(DataFlowProperty.class).get());
- });
-
- final IRVertex vertex19 = processedDAG.getTopologicalSort().get(17);
- assertEquals(ResourcePriorityProperty.RESERVED, vertex19.getPropertyValue(ResourcePriorityProperty.class).get());
- processedDAG.getIncomingEdgesOf(vertex19).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
- assertEquals(DataFlowProperty.Value.Push, irEdge.getPropertyValue(DataFlowProperty.class).get());
- });
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
index 4f4e32f..6f8ed04 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
@@ -44,10 +44,11 @@ public class LoopExtractionPassTest {
compiledDAG = CompilerTestUtil.compileALSDAG();
}
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testLoopGrouping() {
final DAG<IRVertex, IREdge> processedDAG = new LoopExtractionPass().apply(compiledDAG);
- assertEquals(13, processedDAG.getTopologicalSort().size());
+ assertEquals(9, processedDAG.getTopologicalSort().size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
index 9a72446..29ef1d7 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
@@ -46,9 +46,10 @@ public class LoopInvariantCodeMotionALSInefficientTest {
groupedDAG = new LoopExtractionPass().apply(inefficientALSDAG);
}
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testForInefficientALSDAG() throws Exception {
- final long expectedNumOfVertices = groupedDAG.getVertices().size() + 5;
+ final long expectedNumOfVertices = groupedDAG.getVertices().size() + 3;
final DAG<IRVertex, IREdge> processedDAG = LoopOptimizations.getLoopInvariantCodeMotionPass()
.apply(groupedDAG);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index 9964cbb..d6b3085 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -62,32 +62,32 @@ public class LoopInvariantCodeMotionPassTest {
assertTrue(alsLoopOpt.isPresent());
final LoopVertex alsLoop = alsLoopOpt.get();
- final IRVertex vertex6 = groupedDAG.getTopologicalSort().get(11);
- final IRVertex vertex18 = alsLoop.getDAG().getTopologicalSort().get(4);
+ final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3);
+ final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(4);
- final Set<IREdge> oldDAGIncomingEdges = alsLoop.getDagIncomingEdges().get(vertex18);
- final List<IREdge> newDAGIncomingEdge = groupedDAG.getIncomingEdgesOf(vertex6);
+ final Set<IREdge> oldDAGIncomingEdges = alsLoop.getDagIncomingEdges().get(vertex15);
+ final List<IREdge> newDAGIncomingEdge = groupedDAG.getIncomingEdgesOf(vertex7);
- alsLoop.getDagIncomingEdges().remove(vertex18);
- alsLoop.getDagIncomingEdges().putIfAbsent(vertex6, new HashSet<>());
- newDAGIncomingEdge.forEach(alsLoop.getDagIncomingEdges().get(vertex6)::add);
+ alsLoop.getDagIncomingEdges().remove(vertex15);
+ alsLoop.getDagIncomingEdges().putIfAbsent(vertex7, new HashSet<>());
+ newDAGIncomingEdge.forEach(alsLoop.getDagIncomingEdges().get(vertex7)::add);
- alsLoop.getNonIterativeIncomingEdges().remove(vertex18);
- alsLoop.getNonIterativeIncomingEdges().putIfAbsent(vertex6, new HashSet<>());
- newDAGIncomingEdge.forEach(alsLoop.getNonIterativeIncomingEdges().get(vertex6)::add);
+ alsLoop.getNonIterativeIncomingEdges().remove(vertex15);
+ alsLoop.getNonIterativeIncomingEdges().putIfAbsent(vertex7, new HashSet<>());
+ newDAGIncomingEdge.forEach(alsLoop.getNonIterativeIncomingEdges().get(vertex7)::add);
- alsLoop.getBuilder().addVertex(vertex6);
+ alsLoop.getBuilder().addVertex(vertex7);
oldDAGIncomingEdges.forEach(alsLoop.getBuilder()::connectVertices);
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
groupedDAG.topologicalDo(v -> {
- if (!v.equals(vertex6) && !v.equals(alsLoop)) {
+ if (!v.equals(vertex7) && !v.equals(alsLoop)) {
builder.addVertex(v);
groupedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
} else if (v.equals(alsLoop)) {
builder.addVertex(v);
groupedDAG.getIncomingEdgesOf(v).forEach(e -> {
- if (!e.getSrc().equals(vertex6)) {
+ if (!e.getSrc().equals(vertex7)) {
builder.connectVertices(e);
} else {
final Optional<IREdge> incomingEdge = newDAGIncomingEdge.stream().findFirst();
@@ -105,7 +105,8 @@ public class LoopInvariantCodeMotionPassTest {
dagToBeRefactored = builder.build();
}
- @Test
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ // @Test
public void testLoopInvariantCodeMotionPass() throws Exception {
final long numberOfGroupedVertices = groupedDAG.getVertices().size();
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index b523543..57303fc 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -34,6 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class WindowedWordCountITCase {
+
private static final int TIMEOUT = 120000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";