You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:02 UTC
[23/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
deleted file mode 100644
index 5175d8c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * This test verifies that the optimizer assigns the correct
- * data exchange mode to a simple forward / shuffle plan.
- *
- * <pre>
- * (source) -> (map) -> (filter) -> (groupBy / reduce)
- * </pre>
- */
-@SuppressWarnings("serial")
-public class DataExchangeModeForwardTest extends CompilerTestBase {
-
-
- @Test
- public void testPipelinedForced() {
- // PIPELINED_FORCED should result in pipelining all the way
- verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testPipelined() {
- // PIPELINED should result in pipelining all the way
- verifySimpleForwardPlan(ExecutionMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testBatch() {
- // BATCH should result in batching the shuffle all the way
- verifySimpleForwardPlan(ExecutionMode.BATCH,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testBatchForced() {
- // BATCH_FORCED should result in batching all the way
- verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
- DataExchangeMode.BATCH, DataExchangeMode.BATCH,
- DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
- DataExchangeMode.BATCH, DataExchangeMode.BATCH);
- }
-
- private void verifySimpleForwardPlan(ExecutionMode execMode,
- DataExchangeMode toMap,
- DataExchangeMode toFilter,
- DataExchangeMode toKeyExtractor,
- DataExchangeMode toCombiner,
- DataExchangeMode toReduce,
- DataExchangeMode toSink)
- {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setExecutionMode(execMode);
-
- DataSet<String> dataSet = env.readTextFile("/never/accessed");
- dataSet
- .map(new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return 0;
- }
- })
- .filter(new FilterFunction<Integer>() {
- @Override
- public boolean filter(Integer value) {
- return false;
- }
- })
- .groupBy(new IdentityKeyExtractor<Integer>())
- .reduceGroup(new Top1GroupReducer<Integer>())
- .output(new DiscardingOutputFormat<Integer>());
-
- OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
- SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
-
- SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
- SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
-
- SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
- SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
-
- assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
- assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
- assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
- assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
- assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
- assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
deleted file mode 100644
index 6b2691a..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * This test checks the correct assignment of the DataExchangeMode to
- * connections for programs that branch, but do not re-join the branches.
- *
- * <pre>
- * /---> (filter) -> (sink)
- * /
- * /
- * (source) -> (map) -----------------\
- * \ (join) -> (sink)
- * \ (source) --/
- * \
- * \
- * \-> (sink)
- * </pre>
- */
-@SuppressWarnings({"serial", "unchecked"})
-public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
-
- @Test
- public void testPipelinedForced() {
- // PIPELINED_FORCED should result in pipelining all the way
- verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testPipelined() {
- // PIPELINED should result in pipelining all the way
- verifyBranchigPlan(ExecutionMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testBatch() {
- // BATCH should result in batching the shuffle all the way
- verifyBranchigPlan(ExecutionMode.BATCH,
- DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
- DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
- DataExchangeMode.PIPELINED);
- }
-
- @Test
- public void testBatchForced() {
- // BATCH_FORCED should result in batching all the way
- verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
- DataExchangeMode.BATCH, DataExchangeMode.BATCH,
- DataExchangeMode.BATCH, DataExchangeMode.BATCH,
- DataExchangeMode.BATCH, DataExchangeMode.BATCH,
- DataExchangeMode.BATCH);
- }
-
- private void verifyBranchigPlan(ExecutionMode execMode,
- DataExchangeMode toMap,
- DataExchangeMode toFilter,
- DataExchangeMode toFilterSink,
- DataExchangeMode toJoin1,
- DataExchangeMode toJoin2,
- DataExchangeMode toJoinSink,
- DataExchangeMode toDirectSink)
- {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setExecutionMode(execMode);
-
- DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
- .map(new MapFunction<Long, Tuple2<Long, Long>>() {
- @Override
- public Tuple2<Long, Long> map(Long value) {
- return new Tuple2<Long, Long>(value, value);
- }
- });
-
- // output 1
- data
- .filter(new FilterFunction<Tuple2<Long, Long>>() {
- @Override
- public boolean filter(Tuple2<Long, Long> value) {
- return false;
- }
- })
- .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
-
- // output 2 does a join before a join
- data
- .join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
- .where(1)
- .equalTo(0)
- .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
-
- // output 3 is direct
- data
- .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
-
- OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
-
- SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
- SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
- SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
-
- SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
- SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
-
- DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
- assertEquals(mapNode, joinNode.getInput1().getSource());
-
- assertEquals(mapNode, directSink.getPredecessor());
-
- assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
- assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
- assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
-
- assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
- assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
-
- assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
- assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
- for (SinkPlanNode node : collection) {
- String nodeName = node.getOptimizerNode().getOperator().getName();
- if (nodeName != null && nodeName.equals(name)) {
- return node;
- }
- }
-
- throw new IllegalArgumentException("No node with that name was found.");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
deleted file mode 100644
index 1a14be5..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.traversals.BranchesVisitor;
-import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
-import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
-import org.junit.Test;
-
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * This test checks whether connections are correctly marked as pipelined breaking.
- */
-@SuppressWarnings("serial")
-public class PipelineBreakingTest {
-
- /**
- * Tests that no pipeline breakers are inserted into a simple forward
- * pipeline.
- *
- * <pre>
- * (source) -> (map) -> (filter) -> (groupBy / reduce)
- * </pre>
- */
- @Test
- public void testSimpleForwardPlan() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> dataSet = env.readTextFile("/never/accessed");
- dataSet
- .map(new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return 0;
- }
- })
- .filter(new FilterFunction<Integer>() {
- @Override
- public boolean filter(Integer value) {
- return false;
- }
- })
- .groupBy(new IdentityKeyExtractor<Integer>())
- .reduceGroup(new Top1GroupReducer<Integer>())
- .output(new DiscardingOutputFormat<Integer>());
-
- DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
-
- SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
- SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
-
- SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
- SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
-
- assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
- assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that branching plans, where the branches are not re-joined,
- * do not place pipeline breakers.
- *
- * <pre>
- * /---> (filter) -> (sink)
- * /
- * /
- * (source) -> (map) -----------------\
- * \ (join) -> (sink)
- * \ (source) --/
- * \
- * \
- * \-> (sink)
- * </pre>
- */
- @Test
- public void testBranchingPlanNotReJoined() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> data = env.readTextFile("/never/accessed")
- .map(new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return 0;
- }
- });
-
- // output 1
- data
- .filter(new FilterFunction<Integer>() {
- @Override
- public boolean filter(Integer value) {
- return false;
- }
- })
- .output(new DiscardingOutputFormat<Integer>());
-
- // output 2 does a join before a join
- data
- .join(env.fromElements(1, 2, 3, 4))
- .where(new IdentityKeyExtractor<Integer>())
- .equalTo(new IdentityKeyExtractor<Integer>())
- .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
-
- // output 3 is direct
- data
- .output(new DiscardingOutputFormat<Integer>());
-
- List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
-
- // gather the optimizer DAG nodes
-
- DataSinkNode sinkAfterFilter = sinks.get(0);
- DataSinkNode sinkAfterJoin = sinks.get(1);
- DataSinkNode sinkDirect = sinks.get(2);
-
- SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
- SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
-
- TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
- SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
-
- // verify the non-pipeline breaking status
-
- assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
- assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
- assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
-
- assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
-
- assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
- assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
- assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
-
- // some other sanity checks on the plan construction (cannot hurt)
-
- assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
- assertEquals(mapNode, sinkDirect.getPredecessorNode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that branches that are re-joined have place pipeline breakers.
- *
- * <pre>
- * /-> (sink)
- * /
- * /-> (reduce) -+ /-> (flatmap) -> (sink)
- * / \ /
- * (source) -> (map) - (join) -+-----\
- * \ / \
- * \-> (filter) -+ \
- * \ (co group) -> (sink)
- * \ /
- * \-> (reduce) - /
- * </pre>
- */
- @Test
- public void testReJoinedBranches() {
- try {
- // build a test program
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
- .map(new MapFunction<Long, Tuple2<Long, Long>>() {
- @Override
- public Tuple2<Long, Long> map(Long value) {
- return new Tuple2<Long, Long>(value, value);
- }
- });
-
- DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
- reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
- @Override
- public boolean filter(Tuple2<Long, Long> value) throws Exception {
- return false;
- }
- });
-
- DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
- .where(1).equalTo(1)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
- .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-
- joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
- .where(0).equalTo(0)
- .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
- .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
- List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
-
- // gather the optimizer DAG nodes
-
- DataSinkNode sinkAfterReduce = sinks.get(0);
- DataSinkNode sinkAfterFlatMap = sinks.get(1);
- DataSinkNode sinkAfterCoGroup = sinks.get(2);
-
- SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
- SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
-
- SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
- TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
- SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
-
- TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
- SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
-
- // test sanity checks (that we constructed the DAG correctly)
-
- assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
- assertEquals(mapNode, filterNode.getPredecessorNode());
- assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
- assertEquals(filterNode, otherReduceNode.getPredecessorNode());
-
- // verify the pipeline breaking status
-
- assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
- assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
- assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
-
- assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
- assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
- assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
- assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
-
- // these should be pipeline breakers
- assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
- assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
- assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
- assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static List<DataSinkNode> convertPlan(Plan p) {
- GraphCreatingVisitor dagCreator =
- new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
-
- // create the DAG
- p.accept(dagCreator);
- List<DataSinkNode> sinks = dagCreator.getSinks();
-
- // build a single root and run the branch tracking logic
- OptimizerNode rootNode;
- if (sinks.size() == 1) {
- rootNode = sinks.get(0);
- }
- else {
- Iterator<DataSinkNode> iter = sinks.iterator();
- rootNode = iter.next();
-
- while (iter.hasNext()) {
- rootNode = new SinkJoiner(rootNode, iter.next());
- }
- }
- rootNode.accept(new IdAndEstimatesVisitor(null));
- rootNode.accept(new BranchesVisitor());
-
- return sinks;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
deleted file mode 100644
index 3e32905..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Set;
-
-public class GlobalPropertiesFilteringTest {
-
- private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
- new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
- );
-
- @Test
- public void testAllErased1() {
-
- SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1));
- gprops.addUniqueFieldCombination(new FieldSet(3, 4));
- gprops.addUniqueFieldCombination(new FieldSet(5, 6));
-
- GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- assertNull(result.getPartitioningOrdering());
- assertNull(result.getUniqueFieldCombination());
- }
-
- @Test
- public void testAllErased2() {
-
- SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1));
- gprops.addUniqueFieldCombination(new FieldSet(3, 4));
- gprops.addUniqueFieldCombination(new FieldSet(5, 6));
-
- GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- assertNull(result.getPartitioningOrdering());
- assertNull(result.getUniqueFieldCombination());
- }
-
- @Test
- public void testHashPartitioningPreserved1() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertTrue(pFields.contains(0));
- assertTrue(pFields.contains(1));
- assertTrue(pFields.contains(4));
- }
-
- @Test
- public void testHashPartitioningPreserved2() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertTrue(pFields.contains(1));
- assertTrue(pFields.contains(2));
- assertTrue(pFields.contains(3));
- }
-
- @Test
- public void testHashPartitioningErased() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- }
-
- @Test
- public void testAnyPartitioningPreserved1() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertTrue(pFields.contains(0));
- assertTrue(pFields.contains(1));
- assertTrue(pFields.contains(4));
- }
-
- @Test
- public void testAnyPartitioningPreserved2() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertTrue(pFields.contains(1));
- assertTrue(pFields.contains(2));
- assertTrue(pFields.contains(3));
- }
-
- @Test
- public void testAnyPartitioningErased() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- }
-
- @Test
- public void testCustomPartitioningPreserved1() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
- gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(2, pFields.size());
- assertTrue(pFields.contains(0));
- assertTrue(pFields.contains(4));
- assertEquals(myP, result.getCustomPartitioner());
- }
-
- @Test
- public void testCustomPartitioningPreserved2() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
- gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(2, pFields.size());
- assertTrue(pFields.contains(1));
- assertTrue(pFields.contains(3));
- assertEquals(myP, result.getCustomPartitioner());
- }
-
- @Test
- public void testCustomPartitioningErased() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
- gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- assertNull(result.getCustomPartitioner());
- }
-
- @Test
- public void testRangePartitioningPreserved1() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(1, IntValue.class, Order.ASCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- o.appendOrdering(2, StringValue.class, Order.ASCENDING);
- GlobalProperties gprops = new GlobalProperties();
- gprops.setRangePartitioned(o);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertEquals(1, pFields.get(0).intValue());
- assertEquals(5, pFields.get(1).intValue());
- assertEquals(2, pFields.get(2).intValue());
- Ordering pOrder = result.getPartitioningOrdering();
- assertEquals(3, pOrder.getNumberOfFields());
- assertEquals(1, pOrder.getFieldNumber(0).intValue());
- assertEquals(5, pOrder.getFieldNumber(1).intValue());
- assertEquals(2, pOrder.getFieldNumber(2).intValue());
- assertEquals(Order.ASCENDING, pOrder.getOrder(0));
- assertEquals(Order.DESCENDING, pOrder.getOrder(1));
- assertEquals(Order.ASCENDING, pOrder.getOrder(2));
- assertEquals(IntValue.class, pOrder.getType(0));
- assertEquals(LongValue.class, pOrder.getType(1));
- assertEquals(StringValue.class, pOrder.getType(2));
- }
-
- @Test
- public void testRangePartitioningPreserved2() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(1, IntValue.class, Order.ASCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- o.appendOrdering(2, StringValue.class, Order.ASCENDING);
- GlobalProperties gprops = new GlobalProperties();
- gprops.setRangePartitioned(o);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
- FieldList pFields = result.getPartitioningFields();
- assertEquals(3, pFields.size());
- assertEquals(3, pFields.get(0).intValue());
- assertEquals(1, pFields.get(1).intValue());
- assertEquals(0, pFields.get(2).intValue());
- Ordering pOrder = result.getPartitioningOrdering();
- assertEquals(3, pOrder.getNumberOfFields());
- assertEquals(3, pOrder.getFieldNumber(0).intValue());
- assertEquals(1, pOrder.getFieldNumber(1).intValue());
- assertEquals(0, pOrder.getFieldNumber(2).intValue());
- assertEquals(Order.ASCENDING, pOrder.getOrder(0));
- assertEquals(Order.DESCENDING, pOrder.getOrder(1));
- assertEquals(Order.ASCENDING, pOrder.getOrder(2));
- assertEquals(IntValue.class, pOrder.getType(0));
- assertEquals(LongValue.class, pOrder.getType(1));
- assertEquals(StringValue.class, pOrder.getType(2));
- }
-
- @Test
- public void testRangePartitioningErased() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(1, IntValue.class, Order.ASCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- o.appendOrdering(2, StringValue.class, Order.ASCENDING);
- GlobalProperties gprops = new GlobalProperties();
- gprops.setRangePartitioned(o);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
- assertNull(result.getPartitioningOrdering());
- assertNull(result.getPartitioningFields());
- }
-
- @Test
- public void testRebalancingPreserved() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setForcedRebalanced();
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
- assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning());
- assertNull(result.getPartitioningFields());
- }
-
- @Test
- public void testUniqueFieldGroupsPreserved1() {
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
- FieldSet set1 = new FieldSet(0, 1, 2);
- FieldSet set2 = new FieldSet(3, 4);
- FieldSet set3 = new FieldSet(4, 5, 6, 7);
- GlobalProperties gprops = new GlobalProperties();
- gprops.addUniqueFieldCombination(set1);
- gprops.addUniqueFieldCombination(set2);
- gprops.addUniqueFieldCombination(set3);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
- Set<FieldSet> unique = result.getUniqueFieldCombination();
- FieldSet expected1 = new FieldSet(0, 1, 2);
- FieldSet expected2 = new FieldSet(3, 4);
-
- Assert.assertTrue(unique.size() == 2);
- Assert.assertTrue(unique.contains(expected1));
- Assert.assertTrue(unique.contains(expected2));
- }
-
- @Test
- public void testUniqueFieldGroupsPreserved2() {
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo);
-
- FieldSet set1 = new FieldSet(0, 1, 2);
- FieldSet set2 = new FieldSet(3, 4);
- FieldSet set3 = new FieldSet(4, 5, 6, 7);
- GlobalProperties gprops = new GlobalProperties();
- gprops.addUniqueFieldCombination(set1);
- gprops.addUniqueFieldCombination(set2);
- gprops.addUniqueFieldCombination(set3);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
- Set<FieldSet> unique = result.getUniqueFieldCombination();
- FieldSet expected1 = new FieldSet(1, 2, 5);
- FieldSet expected2 = new FieldSet(4, 6);
-
- Assert.assertTrue(unique.size() == 2);
- Assert.assertTrue(unique.contains(expected1));
- Assert.assertTrue(unique.contains(expected2));
- }
-
- @Test
- public void testUniqueFieldGroupsErased() {
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo);
-
- FieldSet set1 = new FieldSet(0, 1, 2);
- FieldSet set2 = new FieldSet(3, 4);
- FieldSet set3 = new FieldSet(4, 5, 6, 7);
- GlobalProperties gprops = new GlobalProperties();
- gprops.addUniqueFieldCombination(set1);
- gprops.addUniqueFieldCombination(set2);
- gprops.addUniqueFieldCombination(set3);
-
- GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
- Assert.assertNull(result.getUniqueFieldCombination());
- }
-
- @Test(expected = IndexOutOfBoundsException.class)
- public void testInvalidInputIndex() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
- GlobalProperties gprops = new GlobalProperties();
- gprops.setHashPartitioned(new FieldList(0, 1));
-
- gprops.filterBySemanticProperties(sprops, 1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
deleted file mode 100644
index 52826d6..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-
-public class GlobalPropertiesMatchingTest {
-
- @Test
- public void testMatchingAnyPartitioning() {
- try {
-
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setAnyPartitioning(new FieldSet(6, 2));
-
- // match any partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setAnyPartitioning(new FieldList(2, 6));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setAnyPartitioning(new FieldList(6, 2));
- assertTrue(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setAnyPartitioning(new FieldList(6, 2, 4));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setAnyPartitioning(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp4));
-
- GlobalProperties gp5 = new GlobalProperties();
- gp5.setAnyPartitioning(new FieldList(2));
- assertTrue(req.isMetBy(gp5));
- }
-
- // match hash partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setHashPartitioned(new FieldList(2, 6));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setHashPartitioned(new FieldList(6, 2));
- assertTrue(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setHashPartitioned(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp3));
- }
-
- // match range partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
- assertTrue(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
- assertTrue(req.isMetBy(gp4));
- }
-
- // match custom partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner());
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
- assertTrue(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner());
- assertFalse(req.isMetBy(gp3));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMatchingCustomPartitioning() {
- try {
- final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner();
-
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setCustomPartitioned(new FieldSet(6, 2), partitioner);
-
- // match custom partitionings
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setCustomPartitioned(new FieldList(2, 6), partitioner);
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setCustomPartitioned(new FieldList(6, 2), partitioner);
- assertTrue(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
- assertFalse(req.isMetBy(gp3));
- }
-
- // cannot match other types of partitionings
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setAnyPartitioning(new FieldList(6, 2));
- assertFalse(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setHashPartitioned(new FieldList(6, 2));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp3));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testStrictlyMatchingAnyPartitioning() {
-
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setAnyPartitioning(new FieldList(6, 2));
-
- // match any partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setAnyPartitioning(new FieldList(6, 2));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setAnyPartitioning(new FieldList(2, 6));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setAnyPartitioning(new FieldList(6, 2, 3));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp3.setAnyPartitioning(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp5 = new GlobalProperties();
- gp4.setAnyPartitioning(new FieldList(2));
- assertFalse(req.isMetBy(gp4));
- }
-
- // match hash partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setHashPartitioned(new FieldList(6, 2));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setHashPartitioned(new FieldList(2, 6));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setHashPartitioned(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp3));
- }
-
- // match range partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
- assertFalse(req.isMetBy(gp4));
- }
-
- }
-
- @Test
- public void testStrictlyMatchingHashPartitioning() {
-
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setHashPartitioned(new FieldList(6, 2));
-
- // match any partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setAnyPartitioning(new FieldList(6, 2));
- assertFalse(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setAnyPartitioning(new FieldList(2, 6));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setAnyPartitioning(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setAnyPartitioning(new FieldList(2));
- assertFalse(req.isMetBy(gp4));
- }
-
- // match hash partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setHashPartitioned(new FieldList(6, 2));
- assertTrue(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setHashPartitioned(new FieldList(2, 6));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setHashPartitioned(new FieldList(6, 1));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setHashPartitioned(new FieldList(6, 2, 0));
- assertFalse(req.isMetBy(gp4));
- }
-
- // match range partitioning
- {
- GlobalProperties gp1 = new GlobalProperties();
- gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp1));
-
- GlobalProperties gp2 = new GlobalProperties();
- gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp2));
-
- GlobalProperties gp3 = new GlobalProperties();
- gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
- assertFalse(req.isMetBy(gp3));
-
- GlobalProperties gp4 = new GlobalProperties();
- gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
- assertFalse(req.isMetBy(gp4));
- }
-
- }
-
- // --------------------------------------------------------------------------------------------
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
deleted file mode 100644
index 0868720..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.junit.Test;
-
-public class GlobalPropertiesPushdownTest {
-
- @Test
- public void testAnyPartitioningPushedDown() {
- try {
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setAnyPartitioning(new FieldSet(3, 1));
-
- RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
- assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning());
- assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
-
- RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
- assertTrue(nonPreserved == null || nonPreserved.isTrivial());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testHashPartitioningPushedDown() {
- try {
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setHashPartitioned(new FieldSet(3, 1));
-
- RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
- assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning());
- assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
-
- RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
- assertTrue(nonPreserved == null || nonPreserved.isTrivial());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCustomPartitioningNotPushedDown() {
- try {
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner());
-
- RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
- assertTrue(pushedDown == null || pushedDown.isTrivial());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testForcedReblancingNotPushedDown() {
- try {
- RequestedGlobalProperties req = new RequestedGlobalProperties();
- req.setForceRebalancing();
-
- RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
- assertTrue(pushedDown == null || pushedDown.isTrivial());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static SemanticProperties getAllPreservingSemProps() {
- return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
- }
-
- private static SemanticProperties getNonePreservingSemProps() {
- return new SingleInputSemanticProperties();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
deleted file mode 100644
index 1ff62ed..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-public class LocalPropertiesFilteringTest {
-
- private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
- new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
- );
-
- @Test
- public void testAllErased1() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
- lProps = lProps.addUniqueFields(new FieldSet(5,6));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testAllErased2() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testGroupingPreserved1() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNotNull(filtered.getGroupedFields());
- assertEquals(3, filtered.getGroupedFields().size());
- assertTrue(filtered.getGroupedFields().contains(0));
- assertTrue(filtered.getGroupedFields().contains(2));
- assertTrue(filtered.getGroupedFields().contains(3));
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testGroupingPreserved2() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNotNull(filtered.getGroupedFields());
- assertEquals(3, filtered.getGroupedFields().size());
- assertTrue(filtered.getGroupedFields().contains(4));
- assertTrue(filtered.getGroupedFields().contains(0));
- assertTrue(filtered.getGroupedFields().contains(7));
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testGroupingErased() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testSortingPreserved1() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(2, IntValue.class, Order.ASCENDING);
- o.appendOrdering(0, StringValue.class, Order.DESCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- LocalProperties lProps = LocalProperties.forOrdering(o);
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldList gFields = filtered.getGroupedFields();
- Ordering order = filtered.getOrdering();
-
- assertNotNull(gFields);
- assertEquals(3, gFields.size());
- assertTrue(gFields.contains(0));
- assertTrue(gFields.contains(2));
- assertTrue(gFields.contains(5));
- assertNotNull(order);
- assertEquals(3, order.getNumberOfFields());
- assertEquals(2, order.getFieldNumber(0).intValue());
- assertEquals(0, order.getFieldNumber(1).intValue());
- assertEquals(5, order.getFieldNumber(2).intValue());
- assertEquals(Order.ASCENDING, order.getOrder(0));
- assertEquals(Order.DESCENDING, order.getOrder(1));
- assertEquals(Order.DESCENDING, order.getOrder(2));
- assertEquals(IntValue.class, order.getType(0));
- assertEquals(StringValue.class, order.getType(1));
- assertEquals(LongValue.class, order.getType(2));
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testSortingPreserved2() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(2, IntValue.class, Order.ASCENDING);
- o.appendOrdering(0, StringValue.class, Order.DESCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- LocalProperties lProps = LocalProperties.forOrdering(o);
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldList gFields = filtered.getGroupedFields();
- Ordering order = filtered.getOrdering();
-
- assertNotNull(gFields);
- assertEquals(3, gFields.size());
- assertTrue(gFields.contains(3));
- assertTrue(gFields.contains(7));
- assertTrue(gFields.contains(1));
- assertNotNull(order);
- assertEquals(3, order.getNumberOfFields());
- assertEquals(7, order.getFieldNumber(0).intValue());
- assertEquals(3, order.getFieldNumber(1).intValue());
- assertEquals(1, order.getFieldNumber(2).intValue());
- assertEquals(Order.ASCENDING, order.getOrder(0));
- assertEquals(Order.DESCENDING, order.getOrder(1));
- assertEquals(Order.DESCENDING, order.getOrder(2));
- assertEquals(IntValue.class, order.getType(0));
- assertEquals(StringValue.class, order.getType(1));
- assertEquals(LongValue.class, order.getType(2));
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testSortingPreserved3() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(2, IntValue.class, Order.ASCENDING);
- o.appendOrdering(0, StringValue.class, Order.DESCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- LocalProperties lProps = LocalProperties.forOrdering(o);
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldList gFields = filtered.getGroupedFields();
- Ordering order = filtered.getOrdering();
-
- assertNotNull(gFields);
- assertEquals(2, gFields.size());
- assertTrue(gFields.contains(0));
- assertTrue(gFields.contains(2));
- assertNotNull(order);
- assertEquals(2, order.getNumberOfFields());
- assertEquals(2, order.getFieldNumber(0).intValue());
- assertEquals(0, order.getFieldNumber(1).intValue());
- assertEquals(Order.ASCENDING, order.getOrder(0));
- assertEquals(Order.DESCENDING, order.getOrder(1));
- assertEquals(IntValue.class, order.getType(0));
- assertEquals(StringValue.class, order.getType(1));
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testSortingPreserved4() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(2, IntValue.class, Order.ASCENDING);
- o.appendOrdering(0, StringValue.class, Order.DESCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- LocalProperties lProps = LocalProperties.forOrdering(o);
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldList gFields = filtered.getGroupedFields();
- Ordering order = filtered.getOrdering();
-
- assertNotNull(gFields);
- assertEquals(1, gFields.size());
- assertTrue(gFields.contains(7));
- assertNotNull(order);
- assertEquals(1, order.getNumberOfFields());
- assertEquals(7, order.getFieldNumber(0).intValue());
- assertEquals(Order.ASCENDING, order.getOrder(0));
- assertEquals(IntValue.class, order.getType(0));
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testSortingErased() {
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
-
- Ordering o = new Ordering();
- o.appendOrdering(2, IntValue.class, Order.ASCENDING);
- o.appendOrdering(0, StringValue.class, Order.DESCENDING);
- o.appendOrdering(5, LongValue.class, Order.DESCENDING);
- LocalProperties lProps = LocalProperties.forOrdering(o);
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldList gFields = filtered.getGroupedFields();
- Ordering order = filtered.getOrdering();
-
- assertNull(gFields);
- assertNull(order);
- assertNull(filtered.getUniqueFields());
- }
-
- @Test
- public void testUniqueFieldsPreserved1() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = new LocalProperties();
- lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
- lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldSet expected1 = new FieldSet(0,1,2);
- FieldSet expected2 = new FieldSet(3,4);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNotNull(filtered.getUniqueFields());
- assertEquals(2, filtered.getUniqueFields().size());
- assertTrue(filtered.getUniqueFields().contains(expected1));
- assertTrue(filtered.getUniqueFields().contains(expected2));
- }
-
- @Test
- public void testUniqueFieldsPreserved2() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2));
- lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
- lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldSet expected1 = new FieldSet(0,1,2);
- FieldSet expected2 = new FieldSet(3,4);
-
- assertNull(filtered.getOrdering());
- assertNotNull(filtered.getGroupedFields());
- assertEquals(2, filtered.getGroupedFields().size());
- assertTrue(filtered.getGroupedFields().contains(1));
- assertTrue(filtered.getGroupedFields().contains(2));
- assertNotNull(filtered.getUniqueFields());
- assertEquals(2, filtered.getUniqueFields().size());
- assertTrue(filtered.getUniqueFields().contains(expected1));
- assertTrue(filtered.getUniqueFields().contains(expected2));
- }
-
- @Test
- public void testUniqueFieldsPreserved3() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = new LocalProperties();
- lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
- lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
- FieldSet expected1 = new FieldSet(5,6,7);
- FieldSet expected2 = new FieldSet(3,4);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNotNull(filtered.getUniqueFields());
- assertEquals(2, filtered.getUniqueFields().size());
- assertTrue(filtered.getUniqueFields().contains(expected1));
- assertTrue(filtered.getUniqueFields().contains(expected2));
- }
-
- @Test
- public void testUniqueFieldsErased() {
-
- SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lProps = new LocalProperties();
- lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
- lProps = lProps.addUniqueFields(new FieldSet(3,4));
- lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
- LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
- assertNull(filtered.getGroupedFields());
- assertNull(filtered.getOrdering());
- assertNull(filtered.getUniqueFields());
- }
-
- @Test(expected = IndexOutOfBoundsException.class)
- public void testInvalidInputIndex() {
-
- SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
- SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
- LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1));
-
- lprops.filterBySemanticProperties(sprops, 1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
deleted file mode 100644
index 74126f8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-
-import java.io.IOException;
-
-@SuppressWarnings("serial")
-public class MockDistribution implements DataDistribution {
-
- @Override
- public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
- return new Key<?>[0];
- }
-
- @Override
- public int getNumberOfFields() {
- return 0;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
-
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
deleted file mode 100644
index 2b2ab14..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public int partition(Tuple2<Long, Integer> key, int numPartitions) {
- return 0;
- }
-}
\ No newline at end of file