You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/11/03 07:24:16 UTC

[GitHub] taegeonum closed pull request #145: [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator

taegeonum closed pull request #145: [Nemo-220] Move PIPELINE variable to TranslationContext in PipelineTranslator
URL: https://github.com/apache/incubator-nemo/pull/145
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index ee10b21f4..3fd9d2be7 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -55,7 +55,6 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -66,8 +65,7 @@
  * For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
  * before start translating inner Beam transform hierarchy.
  */
-public final class PipelineTranslator
-  implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+public final class PipelineTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
 
@@ -76,9 +74,6 @@
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
   private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
 
-  // TODO #220: Move this variable to TranslationContext
-  private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
-
   /**
    * Static translator method.
    * @param pipeline the original root
@@ -89,8 +84,7 @@
   public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
                                                 final CompositeTransformVertex root,
                                                 final PipelineOptions pipelineOptions) {
-    PIPELINE.set(pipeline);
-    return INSTANCE.apply(root, pipelineOptions);
+    return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
   }
 
   /**
@@ -144,7 +138,7 @@ private static void boundedReadTranslator(final TranslationContext ctx,
   private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
                                                    final PrimitiveTransformVertex transformVertex) {
     try {
-      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
       final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
       final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
       final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
@@ -221,7 +215,7 @@ private static void parDoMultiOutputTranslator(final TranslationContext ctx,
   private static Transform createGBKTransform(
     final TranslationContext ctx,
     final TransformVertex transformVertex) {
-    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
     final PCollection<?> mainInput = (PCollection<?>)
       Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
@@ -372,12 +366,12 @@ private static void loopTranslator(final TranslationContext ctx,
     ctx.loopVertexStack.pop();
   }
 
-  @Override
-  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
-                                     final PipelineOptions pipelineOptions) {
-    final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
+  private DAG<IRVertex, IREdge> translateToIRDAG(final CompositeTransformVertex vertex,
+                                                 final Pipeline pipeline,
+                                                 final PipelineOptions pipelineOptions) {
+    final TranslationContext ctx = new TranslationContext(vertex, pipeline, primitiveTransformToTranslator,
         compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
-    ctx.translate(pipeline);
+    ctx.translate(vertex);
     return ctx.builder.build();
   }
 
@@ -453,23 +447,28 @@ private static void loopTranslator(final TranslationContext ctx,
     private final Map<PValue, TupleTag<?>> pValueToTag;
     private final Stack<LoopVertex> loopVertexStack;
     private final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> communicationPatternSelector;
+    private final Pipeline pipeline;
+
 
     private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator;
     private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
 
     /**
      * @param root the root to translate
+     * @param pipeline the pipeline to translate
      * @param primitiveTransformToTranslator provides translators for PrimitiveTransform
      * @param compositeTransformToTranslator provides translators for CompositeTransform
      * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
      * @param pipelineOptions {@link PipelineOptions}
      */
     private TranslationContext(final CompositeTransformVertex root,
+                               final Pipeline pipeline,
                                final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
                                final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
                                final PipelineOptions pipelineOptions) {
       this.root = root;
+      this.pipeline = pipeline;
       this.builder = new DAGBuilder<>();
       this.pValueToProducer = new HashMap<>();
       this.pValueToTag = new HashMap<>();
@@ -489,6 +488,7 @@ private TranslationContext(final CompositeTransformVertex root,
     private TranslationContext(final TranslationContext ctx,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
       this.root = ctx.root;
+      this.pipeline = ctx.pipeline;
       this.pipelineOptions = ctx.pipelineOptions;
       this.builder = ctx.builder;
       this.pValueToProducer = ctx.pValueToProducer;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services