You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/21 23:52:03 UTC
[incubator-nemo] branch master updated: [NEMO-2186] Parallelism=1
for PCollectionView (#160)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 8e249c5 [NEMO-2186] Parallelism=1 for PCollectionView (#160)
8e249c5 is described below
commit 8e249c532945649a02b80a035fa23238e07a4fbe
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Nov 22 08:51:58 2018 +0900
[NEMO-2186] Parallelism=1 for PCollectionView (#160)
JIRA: [NEMO-286: Parallelism=1 for PCollectionView](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-286)
**Major changes:**
- Set Parallelism=1 for side input stages
- Set CommPattern=Broadcast from/to side input stages
- Use `DefaultPolicyParallelismFive` for MLR/ALS ITCases to test side inputs in multi-parallelism pipelines
---
.../frontend/beam/PipelineTranslationContext.java | 13 ++++++++++---
.../compiler/frontend/beam/coder/SideInputCoder.java | 16 ++++++++++++++--
.../nemo/examples/beam/AlternatingLeastSquareITCase.java | 3 ++-
.../beam/MultinomialLogisticRegressionITCase.java | 3 ++-
.../datatransfer/OperatorVertexOutputCollector.java | 2 --
5 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index d54a7cd..24c923b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -32,6 +32,7 @@ import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
@@ -121,10 +122,14 @@ final class PipelineTranslationContext {
// Second edge: transform to the dstIRVertex
final IREdge secondEdge =
- new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex);
+ new IREdge(CommunicationPatternProperty.Value.BroadCast, sideInputTransformVertex, dstVertex);
final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
+ // The vertices should be Parallelism=1
+ srcVertex.setPropertyPermanently(ParallelismProperty.of(1));
+ sideInputTransformVertex.setPropertyPermanently(ParallelismProperty.of(1));
+
secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder)));
secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder)));
builder.connectVertices(secondEdge);
@@ -267,9 +272,11 @@ final class PipelineTranslationContext {
} else if (viewFn instanceof PCollectionViews.ListViewFn) {
return ListCoder.of(inputKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.MapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
+ final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+ return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
+ final KvCoder inputValueKVCoder = (KvCoder) inputKVCoder.getValueCoder();
+ return MapCoder.of(inputValueKVCoder.getKeyCoder(), inputValueKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
return inputKVCoder;
} else {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
index 59a1792..54628ee 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -18,17 +18,19 @@
*/
package org.apache.nemo.compiler.frontend.beam.coder;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.nemo.compiler.frontend.beam.SideInputElement;
import java.io.*;
+import java.util.Collections;
+import java.util.List;
/**
* EncoderFactory for side inputs.
* @param <T> type of the side input value.
*/
-public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+public final class SideInputCoder<T> extends StructuredCoder<SideInputElement<T>> {
private final Coder<T> valueCoder;
/**
@@ -59,4 +61,14 @@ public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
final T value = valueCoder.decode(inStream);
return new SideInputElement<>(index, value);
}
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.singletonList(valueCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "Requires deterministic valueCoder", valueCoder);
+ }
}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
index bfde758..3def350 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -22,6 +22,7 @@ import org.apache.nemo.client.JobLauncher;
import org.apache.nemo.common.test.ArgBuilder;
import org.apache.nemo.common.test.ExampleTestUtil;
import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
import org.apache.nemo.examples.beam.policy.TransientResourcePolicyParallelismTen;
import org.junit.After;
import org.junit.Before;
@@ -71,7 +72,7 @@ public final class AlternatingLeastSquareITCase {
JobLauncher.main(builder
.addResourceJson(noPoisonResources)
.addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_default")
- .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
index a5b21c8..60860ce 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
@@ -21,6 +21,7 @@ package org.apache.nemo.examples.beam;
import org.apache.nemo.client.JobLauncher;
import org.apache.nemo.common.test.ArgBuilder;
import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -54,7 +55,7 @@ public final class MultinomialLogisticRegressionITCase {
.addJobId(MultinomialLogisticRegressionITCase.class.getSimpleName())
.addUserMain(MultinomialLogisticRegression.class.getCanonicalName())
.addUserArgs(input, numFeatures, numClasses, numIteration)
- .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.addResourceJson(executorResourceFileName)
.build());
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 74c3ffb..b1775f7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,7 +77,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emit(final O output) {
-
for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
emit(internalVertex.getNextOperator(), output);
}
@@ -89,7 +88,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public <T> void emit(final String dstVertexId, final T output) {
-
if (internalAdditionalOutputs.containsKey(dstVertexId)) {
for (final NextIntraTaskOperatorInfo internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
emit(internalVertex.getNextOperator(), (O) output);