You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/11/21 23:52:00 UTC

[GitHub] taegeonum closed pull request #160: [NEMO-2186] Parallelism=1 for PCollectionView

taegeonum closed pull request #160: [NEMO-2186] Parallelism=1 for PCollectionView
URL: https://github.com/apache/incubator-nemo/pull/160
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index d54a7cdf0..24c923bef 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -32,6 +32,7 @@
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.LoopVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
@@ -121,10 +122,14 @@ void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, PCollectionV
 
       // Second edge: transform to the dstIRVertex
       final IREdge secondEdge =
-        new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex);
+        new IREdge(CommunicationPatternProperty.Value.BroadCast, sideInputTransformVertex, dstVertex);
       final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
         WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
 
+      // The vertices should be Parallelism=1
+      srcVertex.setPropertyPermanently(ParallelismProperty.of(1));
+      sideInputTransformVertex.setPropertyPermanently(ParallelismProperty.of(1));
+
       secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder)));
       secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder)));
       builder.connectVertices(secondEdge);
@@ -267,9 +272,11 @@ DAGBuilder getBuilder() {
     } else if (viewFn instanceof PCollectionViews.ListViewFn) {
       return ListCoder.of(inputKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
+      final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+      return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
+      final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+      return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
       return inputKVCoder;
     } else {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
index 59a1792cd..54628ee32 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -18,17 +18,19 @@
  */
 package org.apache.nemo.compiler.frontend.beam.coder;
 
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 
 import java.io.*;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * EncoderFactory for side inputs.
  * @param <T> type of the side input value.
  */
-public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+public final class SideInputCoder<T> extends StructuredCoder<SideInputElement<T>> {
   private final Coder<T> valueCoder;
 
   /**
@@ -59,4 +61,14 @@ public void encode(final SideInputElement<T> sideInputElement, final OutputStrea
     final T value = valueCoder.decode(inStream);
     return new SideInputElement<>(index, value);
   }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Collections.singletonList(valueCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(this, "Requires deterministic valueCoder", valueCoder);
+  }
 }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
index bfde7588a..3def35091 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -22,6 +22,7 @@
 import org.apache.nemo.common.test.ArgBuilder;
 import org.apache.nemo.common.test.ExampleTestUtil;
 import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 import org.apache.nemo.examples.beam.policy.TransientResourcePolicyParallelismTen;
 import org.junit.After;
 import org.junit.Before;
@@ -71,7 +72,7 @@ public void testDefault() throws Exception {
     JobLauncher.main(builder
         .addResourceJson(noPoisonResources)
         .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_default")
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
index a5b21c801..60860cec2 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
@@ -21,6 +21,7 @@
 import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
 import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -54,7 +55,7 @@ public void test() throws Exception {
         .addJobId(MultinomialLogisticRegressionITCase.class.getSimpleName())
         .addUserMain(MultinomialLogisticRegression.class.getCanonicalName())
         .addUserArgs(input, numFeatures, numClasses, numIteration)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .addResourceJson(executorResourceFileName)
         .build());
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 74c3ffbe0..b1775f737 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,7 +77,6 @@ private void emit(final OutputWriter writer, final O output) {
 
   @Override
   public void emit(final O output) {
-
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
       emit(internalVertex.getNextOperator(), output);
     }
@@ -89,7 +88,6 @@ public void emit(final O output) {
 
   @Override
   public <T> void emit(final String dstVertexId, final T output) {
-
     if (internalAdditionalOutputs.containsKey(dstVertexId)) {
       for (final NextIntraTaskOperatorInfo internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
         emit(internalVertex.getNextOperator(), (O) output);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services