You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:31 UTC

[incubator-nemo] 14/14: merge

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 8a7ae953f3a9de59733df65e27d67a41269431c7
Merge: e419597 18b61c1
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Oct 17 10:12:58 2018 +0900

    merge

 .../java/org/apache/nemo/common/HashRange.java     |   5 +-
 .../java/org/apache/nemo/common/StateMachine.java  |  21 +-
 .../main/java/org/apache/nemo/common/dag/DAG.java  |  59 +--
 .../main/java/org/apache/nemo/common/dag/Edge.java |   6 +-
 .../java/org/apache/nemo/common/dag/Vertex.java    |   7 +-
 .../org/apache/nemo/common/ir/edge/IREdge.java     |  14 +-
 .../ir/executionproperty/ExecutionPropertyMap.java |  24 +-
 .../org/apache/nemo/common/ir/vertex/IRVertex.java |  15 +-
 .../apache/nemo/common/ir/vertex/LoopVertex.java   |  43 +-
 .../nemo/common/ir/vertex/OperatorVertex.java      |  13 +-
 compiler/frontend/beam/pom.xml                     |   5 +
 .../compiler/frontend/beam/BeamKeyExtractor.java   |   7 +-
 .../compiler/frontend/beam/NemoPipelineRunner.java |   5 +-
 .../compiler/frontend/beam/PipelineTranslator.java | 141 +++---
 .../beam/source/BeamBoundedSourceVertex.java       |  46 +-
 .../BroadcastVariableSideInputReader.java          |  60 +++
 .../beam/transform/CreateViewTransform.java        |  20 +-
 .../beam/transform/DefaultOutputManager.java       |  51 +++
 .../frontend/beam/transform/DoFnTransform.java     | 157 +++++++
 .../frontend/beam/transform/DoTransform.java       | 490 ---------------------
 .../frontend/beam/transform/FlattenTransform.java  |   8 +-
 .../beam/transform/GroupByKeyTransform.java        |  15 +-
 .../frontend/beam/transform/WindowFnTransform.java |  98 +++++
 .../frontend/beam/transform/WindowTransform.java   |  61 ---
 .../nemo/compiler/optimizer/PairKeyExtractor.java} |  17 +-
 .../compiletime/reshaping/SkewReshapingPass.java   |   3 +-
 .../compiler/backend/nemo/DAGConverterTest.java    |   5 +-
 .../frontend/beam/transform/DoFnTransformTest.java | 287 ++++++++++++
 .../beam/MultinomialLogisticRegression.java        |   6 +-
 .../nemo/examples/beam/BeamSimpleSumSQLITCase.java |  68 +++
 .../nemo/runtime/common/plan/RuntimeEdge.java      |  13 +-
 .../org/apache/nemo/runtime/common/plan/Stage.java |  17 +-
 .../apache/nemo/runtime/common/plan/StageEdge.java |  19 +-
 .../nemo/runtime/common/state/BlockState.java      |   4 +-
 .../nemo/runtime/common/state/PlanState.java       |   4 +-
 .../nemo/runtime/common/state/StageState.java      |   4 +-
 .../nemo/runtime/common/state/TaskState.java       |   4 +-
 .../master/resource/ExecutorRepresenter.java       |  14 +-
 .../runtime/master/scheduler/BatchScheduler.java   |   4 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |   4 +-
 40 files changed, 1033 insertions(+), 811 deletions(-)

diff --cc compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 5d77296,2486a00..729aaad
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@@ -237,8 -276,7 +276,8 @@@ public final class PipelineTranslato
      final boolean handlesBeamRow = Stream
        .concat(transformVertex.getNode().getInputs().values().stream(),
          transformVertex.getNode().getOutputs().values().stream())
-       .filter(pValue -> getCoder(pValue, ctx.pipeline) instanceof KvCoder)
-       .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
++      .filter(pValue -> getCoder(pValue, ctx.root) instanceof KvCoder)
+       .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
        .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
        .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
      if (handlesBeamRow) {
@@@ -490,17 -529,18 +530,17 @@@
              + "for an edge from %s to %s", communicationPatternSelector, src, dst));
        }
        final IREdge edge = new IREdge(communicationPattern, src, dst);
-       final Coder<?> coder;
+       final Coder coder;
+       final Coder windowCoder;
        if (input instanceof PCollection) {
          coder = ((PCollection) input).getCoder();
+         windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
        } else if (input instanceof PCollectionView) {
-         coder = getCoderForView((PCollectionView) input, pipeline);
+         coder = getCoderForView((PCollectionView) input, root);
 -        windowCoder = ((PCollectionView) input).getPCollection()
 -          .getWindowingStrategy().getWindowFn().windowCoder();
++        windowCoder = ((PCollectionView) input).getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
        } else {
-         coder = null;
-       }
-       if (coder == null) {
          throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
-             + "be determined", src, dst, input));
+           + "be determined", src, dst, input));
        }
  
        edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));