You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/03 07:24:18 UTC

[incubator-nemo] branch master updated: [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator (#145)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3a315  [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator (#145)
2b3a315 is described below

commit 2b3a315a48b539df03fcafa14daa7eea592c568e
Author: WooYeon Lee <wy...@gmail.com>
AuthorDate: Sat Nov 3 16:24:14 2018 +0900

    [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator (#145)
    
    JIRA: [NEMO-220: Move PIPELINE variable to TranslationContext in PipelineTranslator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-220)
    
    **Major changes:**
    - Move PIPELINE variable to TranslationContext in PipelineTranslator
    
    **Minor changes to note:**
    - PipelineTranslator does not implement Bifunction from now
    
    **Tests for the changes:**
    - N/A
---
 .../compiler/frontend/beam/PipelineTranslator.java | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index ee10b21..3fd9d2b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -55,7 +55,6 @@ import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -66,8 +65,7 @@ import java.util.stream.Stream;
  * For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
  * before start translating inner Beam transform hierarchy.
  */
-public final class PipelineTranslator
-  implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+public final class PipelineTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
 
@@ -76,9 +74,6 @@ public final class PipelineTranslator
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
   private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
 
-  // TODO #220: Move this variable to TranslationContext
-  private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
-
   /**
    * Static translator method.
    * @param pipeline the original root
@@ -89,8 +84,7 @@ public final class PipelineTranslator
   public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
                                                 final CompositeTransformVertex root,
                                                 final PipelineOptions pipelineOptions) {
-    PIPELINE.set(pipeline);
-    return INSTANCE.apply(root, pipelineOptions);
+    return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
   }
 
   /**
@@ -144,7 +138,7 @@ public final class PipelineTranslator
   private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
                                                    final PrimitiveTransformVertex transformVertex) {
     try {
-      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
       final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
       final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
       final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
@@ -221,7 +215,7 @@ public final class PipelineTranslator
   private static Transform createGBKTransform(
     final TranslationContext ctx,
     final TransformVertex transformVertex) {
-    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
     final PCollection<?> mainInput = (PCollection<?>)
       Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
@@ -372,12 +366,12 @@ public final class PipelineTranslator
     ctx.loopVertexStack.pop();
   }
 
-  @Override
-  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
-                                     final PipelineOptions pipelineOptions) {
-    final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
+  private DAG<IRVertex, IREdge> translateToIRDAG(final CompositeTransformVertex vertex,
+                                                 final Pipeline pipeline,
+                                                 final PipelineOptions pipelineOptions) {
+    final TranslationContext ctx = new TranslationContext(vertex, pipeline, primitiveTransformToTranslator,
         compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
-    ctx.translate(pipeline);
+    ctx.translate(vertex);
     return ctx.builder.build();
   }
 
@@ -453,23 +447,28 @@ public final class PipelineTranslator
     private final Map<PValue, TupleTag<?>> pValueToTag;
     private final Stack<LoopVertex> loopVertexStack;
     private final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> communicationPatternSelector;
+    private final Pipeline pipeline;
+
 
     private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator;
     private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
 
     /**
      * @param root the root to translate
+     * @param pipeline the pipeline to translate
      * @param primitiveTransformToTranslator provides translators for PrimitiveTransform
      * @param compositeTransformToTranslator provides translators for CompositeTransform
      * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
      * @param pipelineOptions {@link PipelineOptions}
      */
     private TranslationContext(final CompositeTransformVertex root,
+                               final Pipeline pipeline,
                                final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
                                final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
                                final PipelineOptions pipelineOptions) {
       this.root = root;
+      this.pipeline = pipeline;
       this.builder = new DAGBuilder<>();
       this.pValueToProducer = new HashMap<>();
       this.pValueToTag = new HashMap<>();
@@ -489,6 +488,7 @@ public final class PipelineTranslator
     private TranslationContext(final TranslationContext ctx,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
       this.root = ctx.root;
+      this.pipeline = ctx.pipeline;
       this.pipelineOptions = ctx.pipelineOptions;
       this.builder = ctx.builder;
       this.pValueToProducer = ctx.pValueToProducer;