You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:07 UTC
[28/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
deleted file mode 100644
index 565d992..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Visitor;
-import org.junit.Before;
-
-/**
- * Base class for Optimizer tests. Offers utility methods to trigger optimization
- * of a program and to fetch the nodes in an optimizer plan that correspond
- * the the node in the program plan.
- */
-public abstract class CompilerTestBase implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
-
- protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
-
- protected static final int DEFAULT_PARALLELISM = 8;
-
- private static final String CACHE_KEY = "cachekey";
-
- // ------------------------------------------------------------------------
-
- protected transient DataStatistics dataStats;
-
- protected transient Optimizer withStatsCompiler;
-
- protected transient Optimizer noStatsCompiler;
-
- private transient int statCounter;
-
- // ------------------------------------------------------------------------
-
- @Before
- public void setup() {
- this.dataStats = new DataStatistics();
- this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
- this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
- this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
- }
-
- // ------------------------------------------------------------------------
-
- public OptimizedPlan compileWithStats(Plan p) {
- return this.withStatsCompiler.compile(p);
- }
-
- public OptimizedPlan compileNoStats(Plan p) {
- return this.noStatsCompiler.compile(p);
- }
-
- public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
- setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
- }
-
- public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
- final String key = CACHE_KEY + this.statCounter++;
- this.dataStats.cacheBaseStatistics(stats, key);
- source.setStatisticsKey(key);
- }
-
- // ------------------------------------------------------------------------
-
- public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
- return new OptimizerPlanNodeResolver(plan);
- }
-
- public static final class OptimizerPlanNodeResolver {
-
- private final Map<String, ArrayList<PlanNode>> map;
-
- OptimizerPlanNodeResolver(OptimizedPlan p) {
- HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
-
- for (PlanNode n : p.getAllNodes()) {
- Operator<?> c = n.getOriginalOptimizerNode().getOperator();
- String name = c.getName();
-
- ArrayList<PlanNode> list = map.get(name);
- if (list == null) {
- list = new ArrayList<PlanNode>(2);
- map.put(name, list);
- }
-
- // check whether this node is a child of a node with the same contract (aka combiner)
- boolean shouldAdd = true;
- for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
- PlanNode in = iter.next();
- if (in.getOriginalOptimizerNode().getOperator() == c) {
- // is this the child or is our node the child
- if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
- SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
- SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
-
- if (thisNode.getPredecessor() == otherNode) {
- // other node is child, remove it
- iter.remove();
- } else if (otherNode.getPredecessor() == thisNode) {
- shouldAdd = false;
- }
- } else {
- throw new RuntimeException("Unrecodnized case in test.");
- }
- }
- }
-
- if (shouldAdd) {
- list.add(n);
- }
- }
-
- this.map = map;
- }
-
-
- @SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name.");
- } else if (nodes.size() != 1) {
- throw new RuntimeException("Multiple nodes found with the given name.");
- } else {
- return (T) nodes.get(0);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name and stub class.");
- } else {
- PlanNode found = null;
- for (PlanNode node : nodes) {
- if (node.getClass() == stubClass) {
- if (found == null) {
- found = node;
- } else {
- throw new RuntimeException("Multiple nodes found with the given name and stub class.");
- }
- }
- }
- if (found == null) {
- throw new RuntimeException("No node found with the given name and stub class.");
- } else {
- return (T) found;
- }
- }
- }
-
- public List<PlanNode> getNodes(String name) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name.");
- } else {
- return new ArrayList<PlanNode>(nodes);
- }
- }
- }
-
- /**
- * Collects all DataSources of a plan to add statistics
- *
- */
- public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
-
- protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
-
- @Override
- public boolean preVisit(Operator<?> visitable) {
-
- if(visitable instanceof GenericDataSourceBase) {
- sources.add((GenericDataSourceBase<?, ?>) visitable);
- }
- else if(visitable instanceof BulkIterationBase) {
- ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
- }
-
- return true;
- }
-
- @Override
- public void postVisit(Operator<?> visitable) {}
-
- public List<GenericDataSourceBase<?, ?>> getSources() {
- return this.sources;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
deleted file mode 100644
index b17e777..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.Visitor;
-import org.junit.Test;
-
-/**
- * Tests in this class:
- * <ul>
- * <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
- * parallelism between tasks is increased or decreased.
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class DOPChangeTest extends CompilerTestBase {
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
- * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
- * transit as well.
- */
- @Test
- public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
- }
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
- * Expected to re-establish partitioning between map and reduce (hash).
- */
- @Test
- public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
- }
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
- * Expected to re-establish partitioning between map and reduce via a local hash.
- */
- @Test
- public void checkPropertyHandlingWithIncreasingLocalParallelism() {
- final int degOfPar = 2 * DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertTrue("Invalid ship strategy for an operator.",
- (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
- (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
- }
-
-
-
- @Test
- public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar * 2);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar * 2);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar * 2);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- Assert.assertTrue("The no sorting local strategy.",
- LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
- LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
-
- Assert.assertTrue("The no partitioning ship strategy.",
- ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
- ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
- }
-
- /**
- * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
- *
- * Test Plan:
- * <pre>
- *
- * (source) -> reduce -\
- * Match -> (sink)
- * (source) -> reduce -/
- *
- * </pre>
- *
- */
- @Test
- public void checkPropertyHandlingWithTwoInputs() {
- // construct the plan
-
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceA)
- .build();
- ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceB)
- .build();
-
- JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(redA)
- .input2(redB)
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-
- sourceA.setDegreeOfParallelism(5);
- sourceB.setDegreeOfParallelism(7);
- redA.setDegreeOfParallelism(5);
- redB.setDegreeOfParallelism(7);
-
- mat.setDegreeOfParallelism(5);
-
- sink.setDegreeOfParallelism(5);
-
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Partition on DoP Change");
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
-
- //Compile plan to verify that no error is thrown
- jobGen.compileJobGraph(oPlan);
-
- oPlan.accept(new Visitor<PlanNode>() {
-
- @Override
- public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof DualInputPlanNode) {
- DualInputPlanNode node = (DualInputPlanNode) visitable;
- Channel c1 = node.getInput1();
- Channel c2 = node.getInput2();
-
- Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
- Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
- return false;
- }
- return true;
- }
-
- @Override
- public void postVisit(PlanNode visitable) {
- // DO NOTHING
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
deleted file mode 100644
index aaee975..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class DisjointDataFlowsTest extends CompilerTestBase {
-
- @Test
- public void testDisjointFlows() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // generate two different flows
- env.generateSequence(1, 10).print();
- env.generateSequence(1, 10).print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
deleted file mode 100644
index 34aa9f8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.DistinctOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
-
- @Test
- public void testDistinctPlain() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
- .name("source").setParallelism(6);
-
- data
- .distinct().name("reducer")
- .print().name("sink");
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
- // get the original nodes
- SourcePlanNode sourceNode = resolver.getNode("source");
- SingleInputPlanNode reduceNode = resolver.getNode("reducer");
- SinkPlanNode sinkNode = resolver.getNode("sink");
-
- // get the combiner
- SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
- // check wiring
- assertEquals(sourceNode, combineNode.getInput().getSource());
- assertEquals(reduceNode, sinkNode.getInput().getSource());
-
- // check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
- // check the keys
- assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
- assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
- assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
-
- // check DOP
- assertEquals(6, sourceNode.getParallelism());
- assertEquals(6, combineNode.getParallelism());
- assertEquals(8, reduceNode.getParallelism());
- assertEquals(8, sinkNode.getParallelism());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
- }
- }
-
- @Test
- public void testDistinctWithSelectorFunctionKey() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
- .name("source").setParallelism(6);
-
- data
- .distinct(new KeySelector<Tuple2<String,Double>, String>() {
- public String getKey(Tuple2<String, Double> value) { return value.f0; }
- }).name("reducer")
- .print().name("sink");
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
- // get the original nodes
- SourcePlanNode sourceNode = resolver.getNode("source");
- SingleInputPlanNode reduceNode = resolver.getNode("reducer");
- SinkPlanNode sinkNode = resolver.getNode("sink");
-
- // get the combiner
- SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
- // get the key extractors and projectors
- SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
- SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
-
- // check wiring
- assertEquals(sourceNode, keyExtractor.getInput().getSource());
- assertEquals(keyProjector, sinkNode.getInput().getSource());
-
- // check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
- // check the keys
- assertEquals(new FieldList(0), reduceNode.getKeys(0));
- assertEquals(new FieldList(0), combineNode.getKeys(0));
- assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
-
- // check DOP
- assertEquals(6, sourceNode.getParallelism());
- assertEquals(6, keyExtractor.getParallelism());
- assertEquals(6, combineNode.getParallelism());
-
- assertEquals(8, reduceNode.getParallelism());
- assertEquals(8, keyProjector.getParallelism());
- assertEquals(8, sinkNode.getParallelism());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
- }
- }
-
- @Test
- public void testDistinctWithFieldPositionKeyCombinable() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
- .name("source").setParallelism(6);
-
- DistinctOperator<Tuple2<String, Double>> reduced = data
- .distinct(1).name("reducer");
-
- reduced.print().name("sink");
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
- // get the original nodes
- SourcePlanNode sourceNode = resolver.getNode("source");
- SingleInputPlanNode reduceNode = resolver.getNode("reducer");
- SinkPlanNode sinkNode = resolver.getNode("sink");
-
- // get the combiner
- SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
- // check wiring
- assertEquals(sourceNode, combineNode.getInput().getSource());
- assertEquals(reduceNode, sinkNode.getInput().getSource());
-
- // check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
- // check the keys
- assertEquals(new FieldList(1), reduceNode.getKeys(0));
- assertEquals(new FieldList(1), combineNode.getKeys(0));
- assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
-
- // check DOP
- assertEquals(6, sourceNode.getParallelism());
- assertEquals(6, combineNode.getParallelism());
- assertEquals(8, reduceNode.getParallelism());
- assertEquals(8, sinkNode.getParallelism());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
- }
- }
-}
\ No newline at end of file