You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/03 07:24:18 UTC
[incubator-nemo] branch master updated: [Nemo-220] Move PIPELINE
variable to TranslationContext in PipelineTranslator (#145)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3a315 [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator (#145)
2b3a315 is described below
commit 2b3a315a48b539df03fcafa14daa7eea592c568e
Author: WooYeon Lee <wy...@gmail.com>
AuthorDate: Sat Nov 3 16:24:14 2018 +0900
[Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator (#145)
JIRA: [NEMO-220: Move PIPELINE variable to TranslationContext in PipelineTranslator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-220)
**Major changes:**
- Move PIPELINE variable to TranslationContext in PipelineTranslator
**Minor changes to note:**
- PipelineTranslator does not implement Bifunction from now
**Tests for the changes:**
- N/A
---
.../compiler/frontend/beam/PipelineTranslator.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index ee10b21..3fd9d2b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -55,7 +55,6 @@ import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -66,8 +65,7 @@ import java.util.stream.Stream;
* For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
* before start translating inner Beam transform hierarchy.
*/
-public final class PipelineTranslator
- implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+public final class PipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
@@ -76,9 +74,6 @@ public final class PipelineTranslator
private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
- // TODO #220: Move this variable to TranslationContext
- private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
-
/**
* Static translator method.
* @param pipeline the original root
@@ -89,8 +84,7 @@ public final class PipelineTranslator
public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
final CompositeTransformVertex root,
final PipelineOptions pipelineOptions) {
- PIPELINE.set(pipeline);
- return INSTANCE.apply(root, pipelineOptions);
+ return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
}
/**
@@ -144,7 +138,7 @@ public final class PipelineTranslator
private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
final PrimitiveTransformVertex transformVertex) {
try {
- final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
@@ -221,7 +215,7 @@ public final class PipelineTranslator
private static Transform createGBKTransform(
final TranslationContext ctx,
final TransformVertex transformVertex) {
- final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
@@ -372,12 +366,12 @@ public final class PipelineTranslator
ctx.loopVertexStack.pop();
}
- @Override
- public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
- final PipelineOptions pipelineOptions) {
- final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
+ private DAG<IRVertex, IREdge> translateToIRDAG(final CompositeTransformVertex vertex,
+ final Pipeline pipeline,
+ final PipelineOptions pipelineOptions) {
+ final TranslationContext ctx = new TranslationContext(vertex, pipeline, primitiveTransformToTranslator,
compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
- ctx.translate(pipeline);
+ ctx.translate(vertex);
return ctx.builder.build();
}
@@ -453,23 +447,28 @@ public final class PipelineTranslator
private final Map<PValue, TupleTag<?>> pValueToTag;
private final Stack<LoopVertex> loopVertexStack;
private final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> communicationPatternSelector;
+ private final Pipeline pipeline;
+
private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator;
private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
/**
* @param root the root to translate
+ * @param pipeline the pipeline to translate
* @param primitiveTransformToTranslator provides translators for PrimitiveTransform
* @param compositeTransformToTranslator provides translators for CompositeTransform
* @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
* @param pipelineOptions {@link PipelineOptions}
*/
private TranslationContext(final CompositeTransformVertex root,
+ final Pipeline pipeline,
final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
final PipelineOptions pipelineOptions) {
this.root = root;
+ this.pipeline = pipeline;
this.builder = new DAGBuilder<>();
this.pValueToProducer = new HashMap<>();
this.pValueToTag = new HashMap<>();
@@ -489,6 +488,7 @@ public final class PipelineTranslator
private TranslationContext(final TranslationContext ctx,
final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
this.root = ctx.root;
+ this.pipeline = ctx.pipeline;
this.pipelineOptions = ctx.pipelineOptions;
this.builder = ctx.builder;
this.pValueToProducer = ctx.pValueToProducer;