You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:00 UTC
[21/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
deleted file mode 100644
index 8720aa7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.java;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings("serial")
-public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
-
- private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
- private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
- private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
- private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
-
- @Test
- public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
- try {
- Plan plan = getJavaTestPlan(false, true);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
- DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
- SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
- SingleInputPlanNode deltaMapper = resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
-
- // iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
- // the in-loop partitioning is before the final reducer
-
- // verify joinWithInvariant
- assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-
- // verify joinWithSolutionSet
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-
-
- // verify reducer
- assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-
- // currently, the system may partition before or after the mapper
- ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
- ShipStrategyType ss2 = deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
-
- assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
- (ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- @Test
- public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
- try {
- Plan plan = getJavaTestPlan(false, false);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
- DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
- SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-
- // iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
- // the in-loop partitioning is before the final reducer
-
- // verify joinWithInvariant
- assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-
- // verify joinWithSolutionSet
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-
- // verify reducer
- assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-
- // verify solution delta
- assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- @Test
- public void testJavaApiWithDirectSoltionSetUpdate() {
- try {
- Plan plan = getJavaTestPlan(true, false);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
- DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
- SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-
- // iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
- // the in-loop partitioning is before the final reducer
-
- // verify joinWithInvariant
- assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
- assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-
- // verify joinWithSolutionSet
- assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
- assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-
- // verify reducer
- assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-
-
- // verify solution delta
- assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
- assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
-
- @Test
- public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-
- DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-
-
- DataSet<Tuple3<Long, Long, Long>> result =
-
- iter.getWorkset().join(invariantInput)
- .where(1, 2)
- .equalTo(1, 2)
- .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
- public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
- return first;
- }
- });
-
- try {
- result.join(iter.getSolutionSet())
- .where(1, 0)
- .equalTo(0, 2)
- .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
- public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
- return second;
- }
- });
- fail("The join should be rejected with key type mismatches.");
- }
- catch (InvalidProgramException e) {
- // expected!
- }
-
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-
- DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-
-
- DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet =
-
- iter.getWorkset().join(invariantInput)
- .where(1, 2)
- .equalTo(1, 2)
- .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
- public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
- return first;
- }
- })
- .name(JOIN_WITH_INVARIANT_NAME)
-
- .join(iter.getSolutionSet())
- .where(1, 0)
- .equalTo(1, 2)
- .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
- public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
- return second;
- }
- })
- .name(JOIN_WITH_SOLUTION_SET)
- .withForwardedFieldsSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
-
- DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
- .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
- public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
- })
- .name(NEXT_WORKSET_REDUCER_NAME)
- .withForwardedFields("1->1","2->2","0->0");
-
-
- DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
- joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
- .name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") :
- joinedWithSolutionSet;
-
- iter.closeWith(nextSolutionSet, nextWorkset)
- .print();
-
- return env.createProgramPlan();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
deleted file mode 100644
index 23f8897..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CoGroupGlobalPropertiesCompatibilityTest {
-
- @Test
- public void checkCompatiblePartitionings() {
- try {
- final FieldList keysLeft = new FieldList(1, 4);
- final FieldList keysRight = new FieldList(3, 1);
-
- CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-
- // test compatible hash partitioning
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setHashPartitioned(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setHashPartitioned(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setHashPartitioned(keysLeft);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setHashPartitioned(keysRight);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test compatible custom partitioning
- {
- Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setCustomPartitioned(keysLeft, part);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setCustomPartitioned(keysRight, part);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test custom partitioning matching any partitioning
- {
- Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void checkInompatiblePartitionings() {
- try {
- final FieldList keysLeft = new FieldList(1);
- final FieldList keysRight = new FieldList(3);
-
- final Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
- final Partitioner<Object> part2 = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-
- // test incompatible hash with custom partitioning
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setHashPartitioned(keysLeft);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test incompatible custom partitionings
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part2);
-
- assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
deleted file mode 100644
index e7807c9..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
-
- @Test
- public void testRejectCoGroupOnHashAndRangePartitioning() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-
- Configuration cfg = new Configuration();
- cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
- cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
-
- input.coGroup(input).where(0).equalTo(0)
- .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
- .withParameters(cfg)
- .print();
-
- Plan p = env.createProgramPlan();
- try {
- compileNoStats(p);
- fail("This should fail with an exception");
- }
- catch (CompilerException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
deleted file mode 100644
index 839f0a1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class JoinGlobalPropertiesCompatibilityTest {
-
- @Test
- public void checkCompatiblePartitionings() {
- try {
- final FieldList keysLeft = new FieldList(1, 4);
- final FieldList keysRight = new FieldList(3, 1);
-
- SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
-
- // test compatible hash partitioning
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setHashPartitioned(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setHashPartitioned(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setHashPartitioned(keysLeft);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setHashPartitioned(keysRight);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test compatible custom partitioning
- {
- Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setCustomPartitioned(keysLeft, part);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setCustomPartitioned(keysRight, part);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test custom partitioning matching any partitioning
- {
- Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void checkInompatiblePartitionings() {
- try {
- final FieldList keysLeft = new FieldList(1);
- final FieldList keysRight = new FieldList(3);
-
- final Partitioner<Object> part = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
- final Partitioner<Object> part2 = new Partitioner<Object>() {
- @Override
- public int partition(Object key, int numPartitions) {
- return 0;
- }
- };
-
- SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
-
- // test incompatible hash with custom partitioning
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setHashPartitioned(keysLeft);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part);
-
- assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
-
- // test incompatible custom partitionings
- {
- RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
- reqLeft.setAnyPartitioning(keysLeft);
- RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
- reqRight.setAnyPartitioning(keysRight);
-
- GlobalProperties propsLeft = new GlobalProperties();
- propsLeft.setCustomPartitioned(keysLeft, part);
- GlobalProperties propsRight = new GlobalProperties();
- propsRight.setCustomPartitioned(keysRight, part2);
-
- assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
deleted file mode 100644
index 9171cc7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
-
- @Test
- public void testRejectJoinOnHashAndRangePartitioning() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-
- Configuration cfg = new Configuration();
- cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
- cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
-
- input.join(input).where(0).equalTo(0)
- .withParameters(cfg)
- .print();
-
- Plan p = env.createProgramPlan();
- try {
- compileNoStats(p);
- fail("This should fail with an exception");
- }
- catch (CompilerException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
deleted file mode 100644
index 2c1574b..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-public class ChannelTest {
-
- @Test
- public void testGetEstimatesNoReplicationFactor() {
- final long NUM_RECORD = 1001;
- final long SIZE = 467131;
-
- DataSourceNode source = getSourceNode();
- SourcePlanNode planNode = new SourcePlanNode(source, "test node");
- Channel channel = new Channel(planNode);
-
- // no estimates here
- Assert.assertEquals(-1, channel.getEstimatedOutputSize());
- Assert.assertEquals(-1, channel.getEstimatedNumRecords());
-
- // set estimates
- source.setEstimatedNumRecords(NUM_RECORD);
- source.setEstimatedOutputSize(SIZE);
- Assert.assertEquals(SIZE, channel.getEstimatedOutputSize());
- Assert.assertEquals(NUM_RECORD, channel.getEstimatedNumRecords());
- }
-
- @Test
- public void testGetEstimatesWithReplicationFactor() {
- final long NUM_RECORD = 1001;
- final long SIZE = 467131;
-
- final int REPLICATION = 23;
-
- DataSourceNode source = getSourceNode();
- SourcePlanNode planNode = new SourcePlanNode(source, "test node");
- Channel channel = new Channel(planNode);
- channel.setReplicationFactor(REPLICATION);
-
- // no estimates here
- Assert.assertEquals(-1, channel.getEstimatedOutputSize());
- Assert.assertEquals(-1, channel.getEstimatedNumRecords());
-
- // set estimates
- source.setEstimatedNumRecords(NUM_RECORD);
- source.setEstimatedOutputSize(SIZE);
- Assert.assertEquals(SIZE * REPLICATION, channel.getEstimatedOutputSize());
- Assert.assertEquals(NUM_RECORD * REPLICATION, channel.getEstimatedNumRecords());
- }
-
-
-// private static final OptimizerNode getSingleInputNode() {
-// return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(
-// new IdentityMapper<String>(),
-// new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
-// "map"));
-// }
-
- private static final DataSourceNode getSourceNode() {
- return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(
- new TextInputFormat(new Path("/ignored")),
- new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
- "source"));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
deleted file mode 100644
index 366d10d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plandump;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class NumberFormattingTest {
-
- @Test
- public void testFormatNumberNoDigit() {
- assertEquals("0.0", PlanJSONDumpGenerator.formatNumber(0));
- assertEquals("0.00", PlanJSONDumpGenerator.formatNumber(0.0000000001));
- assertEquals("-1.0", PlanJSONDumpGenerator.formatNumber(-1.0));
- assertEquals("1.00", PlanJSONDumpGenerator.formatNumber(1));
- assertEquals("17.00", PlanJSONDumpGenerator.formatNumber(17));
- assertEquals("17.44", PlanJSONDumpGenerator.formatNumber(17.44));
- assertEquals("143.00", PlanJSONDumpGenerator.formatNumber(143));
- assertEquals("143.40", PlanJSONDumpGenerator.formatNumber(143.4));
- assertEquals("143.50", PlanJSONDumpGenerator.formatNumber(143.5));
- assertEquals("143.60", PlanJSONDumpGenerator.formatNumber(143.6));
- assertEquals("143.45", PlanJSONDumpGenerator.formatNumber(143.45));
- assertEquals("143.55", PlanJSONDumpGenerator.formatNumber(143.55));
- assertEquals("143.65", PlanJSONDumpGenerator.formatNumber(143.65));
- assertEquals("143.66", PlanJSONDumpGenerator.formatNumber(143.655));
-
- assertEquals("1.13 K", PlanJSONDumpGenerator.formatNumber(1126.0));
- assertEquals("11.13 K", PlanJSONDumpGenerator.formatNumber(11126.0));
- assertEquals("118.13 K", PlanJSONDumpGenerator.formatNumber(118126.0));
-
- assertEquals("1.44 M", PlanJSONDumpGenerator.formatNumber(1435126.0));
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
deleted file mode 100644
index 7fea8a6..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class DummyCoGroupFunction<L, R> extends RichCoGroupFunction<L, R, Tuple2<L, R>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterable<L> first, Iterable<R> second, Collector<Tuple2<L, R>> out) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
deleted file mode 100644
index 6be8a24..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.util.Collector;
-
-public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(T first, T second, Collector<T> out) {
- out.collect(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
deleted file mode 100644
index 44d3695..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-
-public class DummyReducer<T> extends RichReduceFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public T reduce(T a, T b) {
- return a;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
deleted file mode 100644
index 0316463..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(T value, Collector<T> out) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
deleted file mode 100644
index 11fd044..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.util.Collector;
-
-
-@Combinable
-public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<T> values, Collector<T> out) {
- for (T next : values) {
- out.collect(next);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
deleted file mode 100644
index f335846..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public T getKey(T value) {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
deleted file mode 100644
index 025b4d8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-
-public class IdentityMapper<T> extends RichMapFunction<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public T map(T value) {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
deleted file mode 100644
index 6efbef1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.util.Collector;
-
-public class IdentityPartitionerMapper<T> extends RichMapPartitionFunction<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void mapPartition(Iterable<T> values, Collector<T> out) {
- for (T in : values) {
- out.collect(in);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
deleted file mode 100644
index 39c0e1b..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-
-public class SelectOneReducer<T> extends RichReduceFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public T reduce(T value1, T value2) throws Exception {
- return value1;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
deleted file mode 100644
index 48d13ca..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.util.Collector;
-
-
-@Combinable
-public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<T> values, Collector<T> out) {
- out.collect(values.iterator().next());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
- while (records1.hasNext()) {
- out.collect(records1.next());
- }
-
- while (records2.hasNext()) {
- out.collect(records2.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Record cross(Record first, Record second) throws Exception {
- return first;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final IntValue integer = new IntValue(1);
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- target.setField(0, this.integer);
- target.setField(1, this.integer);
- return target;
- }
-
- @Override
- public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- @Override
- public int serializeRecord(Record rec, byte[] target) throws Exception {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- out.collect(record);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- while (records.hasNext()) {
- out.collect(records.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/log4j-test.properties b/flink-compiler/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-compiler/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/log4j.properties b/flink-compiler/src/test/resources/log4j.properties
deleted file mode 100644
index fa3f937..0000000
--- a/flink-compiler/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/logback-test.xml b/flink-compiler/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-compiler/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 7ee7f25..b826c45 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -61,7 +61,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-compiler</artifactId>
+ <artifactId>flink-optimizer</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/pom.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml
new file mode 100644
index 0000000..55764e9
--- /dev/null
+++ b/flink-optimizer/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-optimizer</artifactId>
+ <name>flink-optimizer</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
new file mode 100644
index 0000000..2f99ddb
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+/**
+ * An exception that is thrown by the Optimizer when encountering an illegal condition.
+ */
+public class CompilerException extends RuntimeException {
+
+ private static final long serialVersionUID = 3810067304570563755L;
+
+ /**
+ * Creates a compiler exception with no message and no cause.
+ */
+ public CompilerException() {}
+
+ /**
+ * Creates a compiler exception with the given message and no cause.
+ *
+ * @param message
+ * The message for the exception.
+ */
+ public CompilerException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a compiler exception with the given cause and no message.
+ *
+ * @param cause
+ * The <tt>Throwable</tt> that caused this exception.
+ */
+ public CompilerException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a compiler exception with the given message and cause.
+ *
+ * @param message
+ * The message for the exception.
+ * @param cause
+ * The <tt>Throwable</tt> that caused this exception.
+ */
+ public CompilerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
new file mode 100644
index 0000000..78e47a0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+/**
+ * An exception that is thrown by the Optimizer when encountering
+ * a problem during the optimizer post pass. This is a dedicated exception
+ * because it is thrown by user-specified optimizer extensions.
+ */
+public class CompilerPostPassException extends CompilerException {
+
+ private static final long serialVersionUID = -322650826288034623L;
+
+ /**
+ * Creates a post pass exception with no message and no cause.
+ */
+ public CompilerPostPassException() {}
+
+ /**
+ * Creates a post pass exception with the given message and no cause.
+ *
+ * @param message The message for the exception.
+ */
+ public CompilerPostPassException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a post pass exception with the given cause and no message.
+ *
+ * @param cause The <tt>Throwable</tt> that caused this exception.
+ */
+ public CompilerPostPassException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a post pass exception with the given message and cause.
+ *
+ * @param message The message for the exception.
+ * @param cause The <tt>Throwable</tt> that caused this exception.
+ */
+ public CompilerPostPassException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}