You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/21 23:52:03 UTC

[incubator-nemo] branch master updated: [NEMO-2186] Parallelism=1 for PCollectionView (#160)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e249c5  [NEMO-2186] Parallelism=1 for PCollectionView (#160)
8e249c5 is described below

commit 8e249c532945649a02b80a035fa23238e07a4fbe
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Nov 22 08:51:58 2018 +0900

    [NEMO-2186] Parallelism=1 for PCollectionView (#160)
    
    JIRA: [NEMO-286: Parallelism=1 for PCollectionView](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-286)
    
    **Major changes:**
    - Set Parallelism=1 for side input stages
    - Set CommPattern=Broadcast from/to side input stages
    - Use `DefaultPolicyParallelismFive` for MLR/ALS ITCases to test side inputs in multi-parallelism pipelines
---
 .../frontend/beam/PipelineTranslationContext.java        | 13 ++++++++++---
 .../compiler/frontend/beam/coder/SideInputCoder.java     | 16 ++++++++++++++--
 .../nemo/examples/beam/AlternatingLeastSquareITCase.java |  3 ++-
 .../beam/MultinomialLogisticRegressionITCase.java        |  3 ++-
 .../datatransfer/OperatorVertexOutputCollector.java      |  2 --
 5 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index d54a7cd..24c923b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -32,6 +32,7 @@ import org.apache.nemo.common.ir.edge.executionproperty.*;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.LoopVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
@@ -121,10 +122,14 @@ final class PipelineTranslationContext {
 
       // Second edge: transform to the dstIRVertex
       final IREdge secondEdge =
-        new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex);
+        new IREdge(CommunicationPatternProperty.Value.BroadCast, sideInputTransformVertex, dstVertex);
       final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
         WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
 
+      // The vertices should be Parallelism=1
+      srcVertex.setPropertyPermanently(ParallelismProperty.of(1));
+      sideInputTransformVertex.setPropertyPermanently(ParallelismProperty.of(1));
+
       secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder)));
       secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder)));
       builder.connectVertices(secondEdge);
@@ -267,9 +272,11 @@ final class PipelineTranslationContext {
     } else if (viewFn instanceof PCollectionViews.ListViewFn) {
       return ListCoder.of(inputKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
+      final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+      return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
+      final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+      return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
       return inputKVCoder;
     } else {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
index 59a1792..54628ee 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -18,17 +18,19 @@
  */
 package org.apache.nemo.compiler.frontend.beam.coder;
 
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 
 import java.io.*;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * EncoderFactory for side inputs.
  * @param <T> type of the side input value.
  */
-public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+public final class SideInputCoder<T> extends StructuredCoder<SideInputElement<T>> {
   private final Coder<T> valueCoder;
 
   /**
@@ -59,4 +61,14 @@ public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
     final T value = valueCoder.decode(inStream);
     return new SideInputElement<>(index, value);
   }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Collections.singletonList(valueCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(this, "Requires deterministic valueCoder", valueCoder);
+  }
 }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
index bfde758..3def350 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -22,6 +22,7 @@ import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
 import org.apache.nemo.common.test.ExampleTestUtil;
 import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 import org.apache.nemo.examples.beam.policy.TransientResourcePolicyParallelismTen;
 import org.junit.After;
 import org.junit.Before;
@@ -71,7 +72,7 @@ public final class AlternatingLeastSquareITCase {
     JobLauncher.main(builder
         .addResourceJson(noPoisonResources)
         .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_default")
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
index a5b21c8..60860ce 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
@@ -21,6 +21,7 @@ package org.apache.nemo.examples.beam;
 import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
 import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -54,7 +55,7 @@ public final class MultinomialLogisticRegressionITCase {
         .addJobId(MultinomialLogisticRegressionITCase.class.getSimpleName())
         .addUserMain(MultinomialLogisticRegression.class.getCanonicalName())
         .addUserArgs(input, numFeatures, numClasses, numIteration)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .addResourceJson(executorResourceFileName)
         .build());
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 74c3ffb..b1775f7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,7 +77,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public void emit(final O output) {
-
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
       emit(internalVertex.getNextOperator(), output);
     }
@@ -89,7 +88,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public <T> void emit(final String dstVertexId, final T output) {
-
     if (internalAdditionalOutputs.containsKey(dstVertexId)) {
       for (final NextIntraTaskOperatorInfo internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
         emit(internalVertex.getNextOperator(), (O) output);