You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:31 UTC
[incubator-nemo] 14/14: merge
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 8a7ae953f3a9de59733df65e27d67a41269431c7
Merge: e419597 18b61c1
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Oct 17 10:12:58 2018 +0900
merge
.../java/org/apache/nemo/common/HashRange.java | 5 +-
.../java/org/apache/nemo/common/StateMachine.java | 21 +-
.../main/java/org/apache/nemo/common/dag/DAG.java | 59 +--
.../main/java/org/apache/nemo/common/dag/Edge.java | 6 +-
.../java/org/apache/nemo/common/dag/Vertex.java | 7 +-
.../org/apache/nemo/common/ir/edge/IREdge.java | 14 +-
.../ir/executionproperty/ExecutionPropertyMap.java | 24 +-
.../org/apache/nemo/common/ir/vertex/IRVertex.java | 15 +-
.../apache/nemo/common/ir/vertex/LoopVertex.java | 43 +-
.../nemo/common/ir/vertex/OperatorVertex.java | 13 +-
compiler/frontend/beam/pom.xml | 5 +
.../compiler/frontend/beam/BeamKeyExtractor.java | 7 +-
.../compiler/frontend/beam/NemoPipelineRunner.java | 5 +-
.../compiler/frontend/beam/PipelineTranslator.java | 141 +++---
.../beam/source/BeamBoundedSourceVertex.java | 46 +-
.../BroadcastVariableSideInputReader.java | 60 +++
.../beam/transform/CreateViewTransform.java | 20 +-
.../beam/transform/DefaultOutputManager.java | 51 +++
.../frontend/beam/transform/DoFnTransform.java | 157 +++++++
.../frontend/beam/transform/DoTransform.java | 490 ---------------------
.../frontend/beam/transform/FlattenTransform.java | 8 +-
.../beam/transform/GroupByKeyTransform.java | 15 +-
.../frontend/beam/transform/WindowFnTransform.java | 98 +++++
.../frontend/beam/transform/WindowTransform.java | 61 ---
.../nemo/compiler/optimizer/PairKeyExtractor.java} | 17 +-
.../compiletime/reshaping/SkewReshapingPass.java | 3 +-
.../compiler/backend/nemo/DAGConverterTest.java | 5 +-
.../frontend/beam/transform/DoFnTransformTest.java | 287 ++++++++++++
.../beam/MultinomialLogisticRegression.java | 6 +-
.../nemo/examples/beam/BeamSimpleSumSQLITCase.java | 68 +++
.../nemo/runtime/common/plan/RuntimeEdge.java | 13 +-
.../org/apache/nemo/runtime/common/plan/Stage.java | 17 +-
.../apache/nemo/runtime/common/plan/StageEdge.java | 19 +-
.../nemo/runtime/common/state/BlockState.java | 4 +-
.../nemo/runtime/common/state/PlanState.java | 4 +-
.../nemo/runtime/common/state/StageState.java | 4 +-
.../nemo/runtime/common/state/TaskState.java | 4 +-
.../master/resource/ExecutorRepresenter.java | 14 +-
.../runtime/master/scheduler/BatchScheduler.java | 4 +-
.../runtime/master/scheduler/TaskRetryTest.java | 4 +-
40 files changed, 1033 insertions(+), 811 deletions(-)
diff --cc compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 5d77296,2486a00..729aaad
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@@ -237,8 -276,7 +276,8 @@@ public final class PipelineTranslato
final boolean handlesBeamRow = Stream
.concat(transformVertex.getNode().getInputs().values().stream(),
transformVertex.getNode().getOutputs().values().stream())
- .filter(pValue -> getCoder(pValue, ctx.pipeline) instanceof KvCoder)
- .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
++ .filter(pValue -> getCoder(pValue, ctx.root) instanceof KvCoder)
+ .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
.map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
.anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
if (handlesBeamRow) {
@@@ -490,17 -529,18 +530,17 @@@
+ "for an edge from %s to %s", communicationPatternSelector, src, dst));
}
final IREdge edge = new IREdge(communicationPattern, src, dst);
- final Coder<?> coder;
+ final Coder coder;
+ final Coder windowCoder;
if (input instanceof PCollection) {
coder = ((PCollection) input).getCoder();
+ windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
} else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, pipeline);
+ coder = getCoderForView((PCollectionView) input, root);
- windowCoder = ((PCollectionView) input).getPCollection()
- .getWindowingStrategy().getWindowFn().windowCoder();
++ windowCoder = ((PCollectionView) input).getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
} else {
- coder = null;
- }
- if (coder == null) {
throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
- + "be determined", src, dst, input));
+ + "be determined", src, dst, input));
}
edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));