You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Robert Metzger <rm...@apache.org> on 2014/07/13 14:36:33 UTC

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

The two files you committed here "PipelineBreakerTest.java" and
"SelectOneReducer.java" both contain the old license header. I'll try and
see if I can make the license checker more strict.


On Sat, Jul 12, 2014 at 7:32 PM, <se...@apache.org> wrote:

> [FLINK-1018] Add tests to verify correct placement of pipeline breakers
> with broadcast variables
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6
>
> Branch: refs/heads/master
> Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
> Parents: a822486
> Author: Stephan Ewen <se...@apache.org>
> Authored: Fri Jul 11 18:02:52 2014 +0200
> Committer: Stephan Ewen <se...@apache.org>
> Committed: Sat Jul 12 19:31:26 2014 +0200
>
> ----------------------------------------------------------------------
>  .../compiler/BranchingPlansCompilerTest.java    |  10 +-
>  .../flink/compiler/PipelineBreakerTest.java     | 137 +++++++++++++++++++
>  .../testfunctions/SelectOneReducer.java         |  28 ++++
>  3 files changed, 170 insertions(+), 5 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> index 31dadae..571f4e4 100644
> ---
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> @@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                 }
>         }
>
> +       @SuppressWarnings({ "unchecked", "deprecation" })
>         @Test
>         public void testBranchEachContractType() {
>                 try {
> @@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .name("Reduce 1")
>                                 .build();
>
> -                       @SuppressWarnings("unchecked")
>                         JoinOperator match1 = JoinOperator.builder(new
> DummyMatchStub(), IntValue.class, 0, 0)
>                                 .input1(sourceB, sourceB, sourceC)
>                                 .input2(sourceC)
> @@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .build();
>
>                         FileDataSink sink = new FileDataSink(new
> DummyOutputFormat(), OUT_FILE, cogroup7);
> -       //              sink.addInput(sourceA);
> -       //              sink.addInput(co3);
> -       //              sink.addInput(co4);
> -       //              sink.addInput(co1);
> +                       sink.addInput(sourceA);
> +                       sink.addInput(cogroup3);
> +                       sink.addInput(cogroup4);
> +                       sink.addInput(cogroup1);
>
>                         // return the PACT plan
>                         Plan plan = new Plan(sink, "Branching of each
> contract type");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> new file mode 100644
> index 0000000..4e43a74
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> @@ -0,0 +1,137 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler;
> +
> +import static org.junit.Assert.*;
> +
> +import org.junit.Test;
> +import org.apache.flink.api.common.Plan;
> +import org.apache.flink.api.java.DataSet;
> +import org.apache.flink.api.java.ExecutionEnvironment;
> +import org.apache.flink.api.java.IterativeDataSet;
> +import org.apache.flink.compiler.plan.BulkIterationPlanNode;
> +import org.apache.flink.compiler.plan.OptimizedPlan;
> +import org.apache.flink.compiler.plan.SingleInputPlanNode;
> +import org.apache.flink.compiler.plan.SinkPlanNode;
> +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
> +import org.apache.flink.compiler.testfunctions.IdentityMapper;
> +import org.apache.flink.compiler.testfunctions.SelectOneReducer;
> +
> +@SuppressWarnings("serial")
> +public class PipelineBreakerTest extends CompilerTestBase {
> +
> +       @Test
> +       public void testPipelineBreakerWithBroadcastVariable() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> source = env.generateSequence(1,
> 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> result = source.map(new
> IdentityMapper<Long>())
> +
>       .map(new IdentityMapper<Long>())
> +
>               .withBroadcastSet(source, "bc");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedAllReduce() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +                       DataSet<Long> bcInput2 = env.generateSequence(1,
> 10);
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(bcInput1, "bc1")
> +
> .withBroadcastSet(bcInput2, "bc2");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedPartialSolution() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +
> +                       DataSet<Long> initialSource =
> env.generateSequence(1, 10);
> +                       IterativeDataSet<Long> iteration =
> initialSource.iterate(100);
> +
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(iteration, "bc2")
> +
> .withBroadcastSet(bcInput1, "bc1");
> +
> +
> +                       iteration.closeWith(result).print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       BulkIterationPlanNode iterationPlanNode =
> (BulkIterationPlanNode) sink.getInput().getSource();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> iterationPlanNode.getRootOfStepFunction();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> new file mode 100644
> index 0000000..492b9f8
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> @@ -0,0 +1,28 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler.testfunctions;
> +
> +import org.apache.flink.api.java.functions.ReduceFunction;
> +
> +public class SelectOneReducer<T> extends ReduceFunction<T> {
> +
> +       private static final long serialVersionUID = 1L;
> +
> +       @Override
> +       public T reduce(T value1, T value2) throws Exception {
> +               return value1;
> +       }
> +}
>
>

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Stephan Ewen <se...@apache.org>.
Okay, I'll wait with merging the streaming code until the rat is updated.

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Robert Metzger <rm...@apache.org>.
I'm currently preparing a commit with some more license fixes and a strict
rat configuration.


On Sun, Jul 13, 2014 at 2:58 PM, Stephan Ewen <se...@apache.org> wrote:

> Ah, good point. They com from old pull requests.
>
> Let me know if you manage to configure rat stricter.
>

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Stephan Ewen <se...@apache.org>.
Ah, good point. They com from old pull requests.

Let me know if you manage to configure rat stricter.