You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:19 UTC
[5/7] flink git commit: [tests] Rename 'compiler' tests to
'optimizer' tests for consistent naming
[tests] Rename 'compiler' tests to 'optimizer' tests for consistent naming
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09fdfda7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09fdfda7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09fdfda7
Branch: refs/heads/master
Commit: 09fdfda7f25cf95426bc43ca33ed7bb39c7d353a
Parents: 6eae11f
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 18 11:11:13 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 18 16:37:41 2015 +0200
----------------------------------------------------------------------
.../compiler/examples/KMeansSingleStepTest.java | 130 -------
.../examples/RelationalQueryCompilerTest.java | 351 -------------------
.../examples/WordCountCompilerTest.java | 184 ----------
.../ConnectedComponentsCoGroupTest.java | 137 --------
.../iterations/IterativeKMeansTest.java | 159 ---------
...ultipleJoinsWithSolutionSetCompilerTest.java | 142 --------
.../iterations/PageRankCompilerTest.java | 110 ------
.../compiler/plandump/DumpCompiledPlanTest.java | 109 ------
.../compiler/plandump/PreviewPlanDumpTest.java | 105 ------
.../MultipleSolutionSetJoinsITCase.java | 2 +-
.../examples/KMeansSingleStepTest.java | 130 +++++++
.../examples/RelationalQueryCompilerTest.java | 351 +++++++++++++++++++
.../examples/WordCountCompilerTest.java | 184 ++++++++++
.../ConnectedComponentsCoGroupTest.java | 137 ++++++++
.../iterations/IterativeKMeansTest.java | 159 +++++++++
...ultipleJoinsWithSolutionSetCompilerTest.java | 142 ++++++++
.../iterations/PageRankCompilerTest.java | 110 ++++++
.../jsonplan/DumpCompiledPlanTest.java | 109 ++++++
.../optimizer/jsonplan/PreviewPlanDumpTest.java | 105 ++++++
19 files changed, 1428 insertions(+), 1428 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
deleted file mode 100644
index ec532be..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class KMeansSingleStepTest extends CompilerTestBase {
-
- private static final String DATAPOINTS = "Data Points";
- private static final String CENTERS = "Centers";
-
- private static final String MAPPER_NAME = "Find Nearest Centers";
- private static final String REDUCER_NAME = "Recompute Center Positions";
-
- private static final String SINK = "New Center Positions";
-
- private final FieldList set0 = new FieldList(0);
-
-
- @Test
- public void testCompileKMeansSingleStepWithStats() {
-
- KMeansSingleStep kmi = new KMeansSingleStep();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- // set the statistics
- OperatorResolver cr = getContractResolver(p);
- FileDataSource pointsSource = cr.getNode(DATAPOINTS);
- FileDataSource centersSource = cr.getNode(CENTERS);
- setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
- setSourceStatistics(centersSource, 1024*1024, 32f);
-
- OptimizedPlan plan = compileWithStats(p);
- checkPlan(plan);
- }
-
- @Test
- public void testCompileKMeansSingleStepWithOutStats() {
-
- KMeansSingleStep kmi = new KMeansSingleStep();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- OptimizedPlan plan = compileNoStats(p);
- checkPlan(plan);
- }
-
-
- private void checkPlan(OptimizedPlan plan) {
-
- OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-
- final SinkPlanNode sink = or.getNode(SINK);
- final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
- final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
- final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-
- // check the mapper
- assertEquals(1, mapper.getBroadcastInputs().size());
- assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
-
- assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
- assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-
- assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-
- assertNull(mapper.getInput().getLocalStrategyKeys());
- assertNull(mapper.getInput().getLocalStrategySortOrder());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-
-
- // check the combiner
- Assert.assertNotNull(combiner);
- assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- assertNull(combiner.getInput().getLocalStrategyKeys());
- assertNull(combiner.getInput().getLocalStrategySortOrder());
- assertEquals(set0, combiner.getKeys(0));
- assertEquals(set0, combiner.getKeys(1));
-
- // check the reducer
- assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
- assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- assertEquals(set0, reducer.getKeys(0));
- assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
- assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-
- // check the sink
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
deleted file mode 100644
index bc53810..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests TPCH Q3 (simplified) under various input conditions.
- */
-@SuppressWarnings("deprecation")
-public class RelationalQueryCompilerTest extends CompilerTestBase {
-
- private static final String ORDERS = "Orders";
- private static final String LINEITEM = "LineItems";
- private static final String MAPPER_NAME = "FilterO";
- private static final String JOIN_NAME = "JoinLiO";
-
- private final FieldList set0 = new FieldList(0);
- private final FieldList set01 = new FieldList(new int[] {0,1});
- private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
-
- // ------------------------------------------------------------------------
-
-
- /**
- * Verifies that a robust repartitioning plan with a hash join is created in the absence of statistics.
- */
- @Test
- public void testQueryNoStatistics() {
- try {
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
- p.setExecutionConfig(defaultExecutionConfig);
- // compile
- final OptimizedPlan plan = compileNoStats(p);
-
- final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-
- // get the nodes from the final plan
- final SinkPlanNode sink = or.getNode("Output");
- final SingleInputPlanNode reducer = or.getNode("AggLio");
- final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
- (SingleInputPlanNode) reducer.getPredecessor() : null;
- final DualInputPlanNode join = or.getNode("JoinLiO");
- final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
-
- // verify the optimizer choices
- checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
- Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner));
- Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, false));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * Checks if any valid plan is produced. Hash joins are expected to build the orders side, as the statistics
- * indicate this to be the smaller one.
- */
- @Test
- public void testQueryAnyValidPlan() {
- testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
- }
-
- /**
- * Verifies that the plan compiles in the presence of empty size=0 estimates.
- */
- @Test
- public void testQueryWithSizeZeroInputs() {
- testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
- }
-
- /**
- * Statistics that push towards a broadcast join.
- */
- @Test
- public void testQueryWithStatsForBroadcastHash() {
- testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
- }
-
- /**
- * Statistics that push towards a broadcast join.
- */
- @Test
- public void testQueryWithStatsForRepartitionAny() {
- testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
- }
-
- /**
- * Statistics that push towards a repartition merge join. If the join blows the data volume up significantly,
- * re-exploiting the sorted order is cheaper.
- */
- @Test
- public void testQueryWithStatsForRepartitionMerge() {
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
- p.setExecutionConfig(defaultExecutionConfig);
- // set compiler hints
- OperatorResolver cr = getContractResolver(p);
- JoinOperator match = cr.getNode("JoinLiO");
- match.getCompilerHints().setFilterFactor(100f);
-
- testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
- }
-
- // ------------------------------------------------------------------------
-
- private void testQueryGeneric(long orderSize, long lineItemSize,
- float ordersFilterFactor,
- boolean broadcastOkay, boolean partitionedOkay,
- boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
- {
- testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
- }
-
- private void testQueryGeneric(long orderSize, long lineItemSize,
- float ordersFilterFactor, float joinFilterFactor,
- boolean broadcastOkay, boolean partitionedOkay,
- boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
- {
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
- p.setExecutionConfig(defaultExecutionConfig);
- testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
- }
-
- private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
- float orderSelectivity, float joinSelectivity,
- boolean broadcastOkay, boolean partitionedOkay,
- boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
- {
- try {
- // set statistics
- OperatorResolver cr = getContractResolver(p);
- FileDataSource ordersSource = cr.getNode(ORDERS);
- FileDataSource lineItemSource = cr.getNode(LINEITEM);
- MapOperator mapper = cr.getNode(MAPPER_NAME);
- JoinOperator joiner = cr.getNode(JOIN_NAME);
- setSourceStatistics(ordersSource, orderSize, 100f);
- setSourceStatistics(lineItemSource, lineitemSize, 140f);
- mapper.getCompilerHints().setAvgOutputRecordSize(16f);
- mapper.getCompilerHints().setFilterFactor(orderSelectivity);
- joiner.getCompilerHints().setFilterFactor(joinSelectivity);
-
- // compile
- final OptimizedPlan plan = compileWithStats(p);
- final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-
- // get the nodes from the final plan
- final SinkPlanNode sink = or.getNode("Output");
- final SingleInputPlanNode reducer = or.getNode("AggLio");
- final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
- (SingleInputPlanNode) reducer.getPredecessor() : null;
- final DualInputPlanNode join = or.getNode("JoinLiO");
- final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
-
- checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
-
- // check the possible variants and that the variant ia allowed in this specific setting
- if (checkBroadcastShipStrategies(join, reducer, combiner)) {
- Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
-
- if (checkHashJoinStrategies(join, reducer, true)) {
- Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
- } else if (checkHashJoinStrategies(join, reducer, false)) {
- Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
- } else if (checkBroadcastMergeJoin(join, reducer)) {
- Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
- } else {
- Assert.fail("Plan has no correct hash join or merge join strategies.");
- }
- }
- else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
- Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
-
- if (checkHashJoinStrategies(join, reducer, true)) {
- Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
- } else if (checkHashJoinStrategies(join, reducer, false)) {
- Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
- } else if (checkRepartitionMergeJoin(join, reducer)) {
- Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
- } else {
- Assert.fail("Plan has no correct hash join or merge join strategies.");
- }
- } else {
- Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
- }
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Checks for special conditions
- // ------------------------------------------------------------------------
-
- private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
- SingleInputPlanNode reducer, SinkPlanNode sink)
- {
- // check ship strategies that are always fix
- Assert.assertEquals(ShipStrategyType.FORWARD, map.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-
- // check the driver strategies that are always fix
- Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
- if (combiner != null) {
- Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- Assert.assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
- }
- }
-
- private boolean checkBroadcastShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
- SingleInputPlanNode combiner)
- {
- if (ShipStrategyType.BROADCAST == join.getInput1().getShipStrategy() &&
- ShipStrategyType.FORWARD == join.getInput2().getShipStrategy() &&
- ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy())
- {
- // check combiner
- Assert.assertNotNull("Plan should have a combiner", combiner);
- Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
- return true;
- } else {
- return false;
- }
- }
-
- private boolean checkRepartitionShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
- SingleInputPlanNode combiner)
- {
- if (ShipStrategyType.PARTITION_HASH == join.getInput1().getShipStrategy() &&
- ShipStrategyType.PARTITION_HASH == join.getInput2().getShipStrategy() &&
- ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy())
- {
- // check combiner
- Assert.assertNull("Plan should not have a combiner", combiner);
- return true;
- } else {
- return false;
- }
- }
-
- private boolean checkHashJoinStrategies(DualInputPlanNode join, SingleInputPlanNode reducer, boolean buildFirst) {
- if ( (buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
- (!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy()) )
- {
- // driver keys
- Assert.assertEquals(set0, join.getKeysForInput1());
- Assert.assertEquals(set0, join.getKeysForInput2());
-
- // local strategies
- Assert.assertEquals(LocalStrategy.NONE, join.getInput1().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.NONE, join.getInput2().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-
- // local strategy keys
- Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
- Assert.assertEquals(set01, reducer.getKeys(0));
- Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
- return true;
- } else {
- return false;
- }
- }
-
- private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
- if (DriverStrategy.MERGE == join.getDriverStrategy()) {
- // driver keys
- Assert.assertEquals(set0, join.getKeysForInput1());
- Assert.assertEquals(set0, join.getKeysForInput2());
-
- // local strategies
- Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-
- // local strategy keys
- Assert.assertEquals(set0, join.getInput1().getLocalStrategyKeys());
- Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
- Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
- Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
- Assert.assertEquals(set01, reducer.getKeys(0));
- Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
- return true;
- } else {
- return false;
- }
- }
-
- private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
- if (DriverStrategy.MERGE == join.getDriverStrategy()) {
- // driver keys
- Assert.assertEquals(set0, join.getKeysForInput1());
- Assert.assertEquals(set0, join.getKeysForInput2());
-
- // local strategies
- Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.NONE, reducer.getInput().getLocalStrategy());
-
- // local strategy keys
- Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
- Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
- Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
- Assert.assertEquals(set01, reducer.getKeys(0));
- Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
- return true;
- } else {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
deleted file mode 100644
index 6cfef9c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.distributions.SimpleDistribution;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class WordCountCompilerTest extends CompilerTestBase {
-
- /**
- * This method tests the simple word count.
- */
- @Test
- public void testWordCount() {
- checkWordCount(true);
- checkWordCount(false);
- }
-
- private void checkWordCount(boolean estimates) {
- try {
- WordCount wc = new WordCount();
- ExecutionConfig ec = new ExecutionConfig();
- Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
- p.setExecutionConfig(ec);
-
- OptimizedPlan plan;
- if (estimates) {
- FileDataSource source = getContractResolver(p).getNode("Input Lines");
- setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
- plan = compileWithStats(p);
- } else {
- plan = compileNoStats(p);
- }
-
- // get the optimizer plan nodes
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
- SinkPlanNode sink = resolver.getNode("Word Counts");
- SingleInputPlanNode reducer = resolver.getNode("Count Words");
- SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-
- // verify the strategies
- Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-
- Channel c = reducer.getInput();
- Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
- FieldList l = new FieldList(0);
- Assert.assertEquals(l, c.getShipStrategyKeys());
- Assert.assertEquals(l, c.getLocalStrategyKeys());
- Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-
- // check the combiner
- SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
- Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- Assert.assertEquals(l, combiner.getKeys(0));
- Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * This method tests that with word count and a range partitioned sink, the range partitioner is pushed down.
- */
- @Test
- public void testWordCountWithSortedSink() {
- checkWordCountWithSortedSink(true);
- checkWordCountWithSortedSink(false);
- }
-
- private void checkWordCountWithSortedSink(boolean estimates) {
- try {
- FileDataSource sourceNode = new FileDataSource(new TextInputFormat(), IN_FILE, "Input Lines");
- MapOperator mapNode = MapOperator.builder(new TokenizeLine())
- .input(sourceNode)
- .name("Tokenize Lines")
- .build();
- ReduceOperator reduceNode = ReduceOperator.builder(new CountWords(), StringValue.class, 0)
- .input(mapNode)
- .name("Count Words")
- .build();
- FileDataSink out = new FileDataSink(new CsvOutputFormat(), OUT_FILE, reduceNode, "Word Counts");
- CsvOutputFormat.configureRecordFormat(out)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .lenient(true)
- .field(StringValue.class, 0)
- .field(IntValue.class, 1);
-
- Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
- out.setGlobalOrder(ordering, new SimpleDistribution(new StringValue[] {new StringValue("N")}));
-
- ExecutionConfig ec = new ExecutionConfig();
- Plan p = new Plan(out, "WordCount Example");
- p.setDefaultParallelism(DEFAULT_PARALLELISM);
- p.setExecutionConfig(ec);
-
- OptimizedPlan plan;
- if (estimates) {
- setSourceStatistics(sourceNode, 1024*1024*1024*1024L, 24f);
- plan = compileWithStats(p);
- } else {
- plan = compileNoStats(p);
- }
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
- SinkPlanNode sink = resolver.getNode("Word Counts");
- SingleInputPlanNode reducer = resolver.getNode("Count Words");
- SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-
- Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_RANGE, reducer.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-
- Channel c = reducer.getInput();
- Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
- FieldList l = new FieldList(0);
- Assert.assertEquals(l, c.getShipStrategyKeys());
- Assert.assertEquals(l, c.getLocalStrategyKeys());
-
- // check that the sort orders are descending
- Assert.assertFalse(c.getShipStrategySortOrder()[0]);
- Assert.assertFalse(c.getLocalStrategySortOrder()[0]);
-
- // check the combiner
- SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
- Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- Assert.assertEquals(l, combiner.getKeys(0));
- Assert.assertEquals(l, combiner.getKeys(1));
- Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
deleted file mode 100644
index 10f2b5c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
-
- private static final String VERTEX_SOURCE = "Vertices";
-
- private static final String ITERATION_NAME = "Connected Components Iteration";
-
- private static final String EDGES_SOURCE = "Edges";
- private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
- private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
-
- private static final String SINK = "Result";
-
- private static final boolean PRINT_PLAN = false;
-
- private final FieldList set0 = new FieldList(0);
-
-
- @Test
- public void testWorksetConnectedComponents() {
- ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
-
- Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
- IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
- plan.setExecutionConfig(new ExecutionConfig());
- OptimizedPlan optPlan = compileNoStats(plan);
- OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
-
- if (PRINT_PLAN) {
- PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
- String json = dumper.getOptimizerPlanAsJSON(optPlan);
- System.out.println(json);
- }
-
- SourcePlanNode vertexSource = or.getNode(VERTEX_SOURCE);
- SourcePlanNode edgesSource = or.getNode(EDGES_SOURCE);
- SinkPlanNode sink = or.getNode(SINK);
- WorksetIterationPlanNode iter = or.getNode(ITERATION_NAME);
-
- DualInputPlanNode neighborsJoin = or.getNode(JOIN_NEIGHBORS_MATCH);
- DualInputPlanNode cogroup = or.getNode(MIN_ID_AND_UPDATE);
-
- // --------------------------------------------------------------------
- // Plan validation:
- //
- // We expect the plan to go with a sort-merge join, because the CoGroup
- // sorts and the join in the successive iteration can re-exploit the sorting.
- // --------------------------------------------------------------------
-
- // test all drivers
- Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
-
- Assert.assertEquals(DriverStrategy.MERGE, neighborsJoin.getDriverStrategy());
- Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
- Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
-
- Assert.assertEquals(DriverStrategy.CO_GROUP, cogroup.getDriverStrategy());
- Assert.assertEquals(set0, cogroup.getKeysForInput1());
- Assert.assertEquals(set0, cogroup.getKeysForInput2());
-
- // test all the shipping strategies
- Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialSolutionSetInput().getShipStrategy());
- Assert.assertEquals(set0, iter.getInitialSolutionSetInput().getShipStrategyKeys());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialWorksetInput().getShipStrategy());
- Assert.assertEquals(set0, iter.getInitialWorksetInput().getShipStrategyKeys());
-
- Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
- Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
- Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
-
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, cogroup.getInput1().getShipStrategy()); // min id
- Assert.assertEquals(ShipStrategyType.FORWARD, cogroup.getInput2().getShipStrategy()); // solution set
-
- // test all the local strategies
- Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-
- // the sort for the neighbor join in the first iteration is pushed out of the loop
- Assert.assertEquals(LocalStrategy.SORT, iter.getInitialWorksetInput().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
- Assert.assertEquals(LocalStrategy.SORT, neighborsJoin.getInput2().getLocalStrategy()); // edges
-
- Assert.assertEquals(LocalStrategy.SORT, cogroup.getInput1().getLocalStrategy());
- Assert.assertEquals(LocalStrategy.NONE, cogroup.getInput2().getLocalStrategy()); // solution set
-
- // check the caches
- Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
-
- JobGraphGenerator jgg = new JobGraphGenerator();
- jgg.compileJobGraph(optPlan);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
deleted file mode 100644
index bd4b6be..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class IterativeKMeansTest extends CompilerTestBase {
-
- private static final String DATAPOINTS = "Data Points";
- private static final String CENTERS = "Centers";
-
- private static final String MAPPER_NAME = "Find Nearest Centers";
- private static final String REDUCER_NAME = "Recompute Center Positions";
-
- private static final String ITERATION_NAME = "k-means loop";
-
- private static final String SINK = "New Center Positions";
-
- private final FieldList set0 = new FieldList(0);
-
- // --------------------------------------------------------------------------------------------
- // K-Means (Bulk Iteration)
- // --------------------------------------------------------------------------------------------
-
- @Test
- public void testCompileKMeansSingleStepWithStats() {
-
- KMeansBroadcast kmi = new KMeansBroadcast();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- // set the statistics
- OperatorResolver cr = getContractResolver(p);
- FileDataSource pointsSource = cr.getNode(DATAPOINTS);
- FileDataSource centersSource = cr.getNode(CENTERS);
- setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
- setSourceStatistics(centersSource, 1024*1024, 32f);
-
- OptimizedPlan plan = compileWithStats(p);
- checkPlan(plan);
-
- new JobGraphGenerator().compileJobGraph(plan);
- }
-
- @Test
- public void testCompileKMeansSingleStepWithOutStats() {
-
- KMeansBroadcast kmi = new KMeansBroadcast();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- OptimizedPlan plan = compileNoStats(p);
- checkPlan(plan);
-
- new JobGraphGenerator().compileJobGraph(plan);
- }
-
- private void checkPlan(OptimizedPlan plan) {
-
- OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-
- final SinkPlanNode sink = or.getNode(SINK);
- final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
- final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
- final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-
- final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
-
- // -------------------- outside the loop -----------------------
-
- // check the sink
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-
- // check the iteration
- assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
-
-
- // -------------------- inside the loop -----------------------
-
- // check the mapper
- assertEquals(1, mapper.getBroadcastInputs().size());
- assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
- assertFalse(mapper.getInput().isOnDynamicPath());
- assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
- assertTrue(mapper.getInput().getTempMode().isCached());
-
- assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
- assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-
- assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-
- assertNull(mapper.getInput().getLocalStrategyKeys());
- assertNull(mapper.getInput().getLocalStrategySortOrder());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-
- // check the combiner
- Assert.assertNotNull(combiner);
- assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
- assertTrue(combiner.getInput().isOnDynamicPath());
-
- assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- assertNull(combiner.getInput().getLocalStrategyKeys());
- assertNull(combiner.getInput().getLocalStrategySortOrder());
- assertEquals(set0, combiner.getKeys(0));
- assertEquals(set0, combiner.getKeys(1));
-
- // check the reducer
- assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
- assertTrue(reducer.getInput().isOnDynamicPath());
- assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- assertEquals(set0, reducer.getKeys(0));
- assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
- assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
deleted file mode 100644
index aea448f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-
-@SuppressWarnings("serial")
-public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
-
- private static final String JOIN_1 = "join1";
- private static final String JOIN_2 = "join2";
-
- @Test
- public void testMultiSolutionSetJoinPlan() {
- try {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- @SuppressWarnings("unchecked")
- DataSet<Tuple2<Long, Double>> inputData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0));
- DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
-
- // add two sinks, to test the case of branching after an iteration
- result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
- result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
-
- Plan p = env.createProgramPlan();
-
- OptimizedPlan optPlan = compileNoStats(p);
-
- OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
-
- DualInputPlanNode join1 = or.getNode(JOIN_1);
- DualInputPlanNode join2 = or.getNode(JOIN_2);
-
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, join1.getDriverStrategy());
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, join2.getDriverStrategy());
-
- assertEquals(ShipStrategyType.PARTITION_HASH, join1.getInput2().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, join2.getInput1().getShipStrategy());
-
- assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass());
- assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass());
-
- new JobGraphGenerator().compileJobGraph(optPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test erroneous: " + e.getMessage());
- }
- }
-
-
-
- public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
-
- DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
-
- DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
- .join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
- .groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
- .join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
-
- DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
-
- DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
-
- return result;
- }
-
- public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
- @Override
- public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
- return new Tuple2<Long, Double>(first.f0, first.f1 + second.f1);
- }
- }
-
- public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
- @Override
- public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
- return new Tuple2<Long, Double>(first.f0, first.f1 + first.f2 + second.f1);
- }
- }
-
- public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
- @Override
- public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
- out.collect(value);
- out.collect(value);
- }
- }
-
- public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
-
- @Override
- public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {
- return new Tuple3<Long, Double, Double>(value.f0, value.f1, value.f1 * 2);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
deleted file mode 100644
index a3b7572..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
-import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
-import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
-import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
-import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-public class PageRankCompilerTest extends CompilerTestBase{
-
- @Test
- public void testPageRank() {
- try {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataSet<Long> pagesInput = env.fromElements(1l);
- @SuppressWarnings("unchecked")
- DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
-
- // assign initial rank to pages
- DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
- map(new RankAssigner((1.0d / 10)));
-
- // build adjacency list from link input
- DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
- linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
-
- // set iterative data set
- IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
-
- Configuration cfg = new Configuration();
- cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-
- DataSet<Tuple2<Long, Double>> newRanks = iteration
- // join pages with outgoing edges and distribute rank
- .join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
- .flatMap(new JoinVertexWithEdgesMatch())
- // collect and sum ranks
- .groupBy(0).aggregate(SUM, 1)
- // apply dampening factor
- .map(new Dampener(0.85, 10));
-
- DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
- newRanks,
- newRanks.join(iteration).where(0).equalTo(0)
- // termination condition
- .filter(new EpsilonFilter()));
-
- finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
-
- // get the plan and compile it
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
- BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
-
- // check that the partitioning is pushed out of the first loop
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
- Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
-
- BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
- Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
deleted file mode 100644
index a981124..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.plandump;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.Client.ProgramAbortException;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.examples.java.clustering.KMeans;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-/*
- * The tests in this class simply invokes the JSON dump code for the optimized plan.
- */
-public class DumpCompiledPlanTest extends CompilerTestBase {
-
- @Test
- public void dumpWordCount() {
- dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpTPCH3() {
- dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpKMeans() {
- dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpIterativeKMeans() {
- // prepare the test environment
- PreviewPlanEnvironment env = new PreviewPlanEnvironment();
- env.setAsContext();
- try {
- // <points path> <centers path> <result path> <num iterations
- KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
- } catch(ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("KMeans failed with an exception");
- }
- dump(env.getPlan());
- }
-
- @Test
- public void dumpWebLogAnalysis() {
- dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpBulkIterationKMeans() {
- dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpDeltaPageRank() {
- dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
- }
-
- private void dump(Plan p) {
- p.setExecutionConfig(new ExecutionConfig());
- try {
- OptimizedPlan op = compileNoStats(p);
- PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
- String json = dumper.getOptimizerPlanAsJSON(op);
- JsonParser parser = new JsonFactory().createJsonParser(json);
- while (parser.nextToken() != null);
- } catch (JsonParseException e) {
- e.printStackTrace();
- Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An error occurred in the test: " + e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
deleted file mode 100644
index b348333..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.plandump;
-
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.util.OperatingSystem;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-/*
- * The tests in this class simply invokes the JSON dump code for the original plan.
- */
-public class PreviewPlanDumpTest {
-
- protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/test/file" : "file:///test/file";
-
- protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/test/output" : "file:///test/output";
-
- protected static final String[] NO_ARGS = new String[0];
-
- @Test
- public void dumpWordCount() {
- dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
-
- // The web interface passes empty string-args to compute the preview of the
- // job, so we should test this situation too
- dump(new WordCount().getPlan(NO_ARGS));
- }
-
- @Test
- public void dumpTPCH3() {
- dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
- dump(new TPCHQuery3().getPlan(NO_ARGS));
- }
-
- @Test
- public void dumpKMeans() {
- dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
- dump(new KMeansSingleStep().getPlan(NO_ARGS));
- }
-
- @Test
- public void dumpWebLogAnalysis() {
- dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
- dump(new WebLogAnalysis().getPlan(NO_ARGS));
- }
-
- @Test
- public void dumpBulkIterationKMeans() {
- dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
- dump(new KMeansBroadcast().getPlan(NO_ARGS));
- }
-
- @Test
- public void dumpDeltaPageRank() {
- dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
- dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
- }
-
- private void dump(Plan p) {
- try {
- List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
- PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
- String json = dumper.getPactPlanAsJSON(sinks);
- JsonParser parser = new JsonFactory().createJsonParser(json);
- while (parser.nextToken() != null);
- } catch (JsonParseException e) {
- e.printStackTrace();
- Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An error occurred in the test: " + e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
index 8eea9f3..e6e91f6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
@@ -26,7 +26,7 @@ import org.junit.Assert;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.test.compiler.iterations.MultipleJoinsWithSolutionSetCompilerTest;
+import org.apache.flink.test.optimizer.iterations.MultipleJoinsWithSolutionSetCompilerTest;
import org.apache.flink.test.util.JavaProgramTestBase;
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
new file mode 100644
index 0000000..ab8ff45
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class KMeansSingleStepTest extends CompilerTestBase {
+
+ private static final String DATAPOINTS = "Data Points";
+ private static final String CENTERS = "Centers";
+
+ private static final String MAPPER_NAME = "Find Nearest Centers";
+ private static final String REDUCER_NAME = "Recompute Center Positions";
+
+ private static final String SINK = "New Center Positions";
+
+ private final FieldList set0 = new FieldList(0);
+
+
+ @Test
+ public void testCompileKMeansSingleStepWithStats() {
+
+ KMeansSingleStep kmi = new KMeansSingleStep();
+ Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+ p.setExecutionConfig(new ExecutionConfig());
+ // set the statistics
+ OperatorResolver cr = getContractResolver(p);
+ FileDataSource pointsSource = cr.getNode(DATAPOINTS);
+ FileDataSource centersSource = cr.getNode(CENTERS);
+ setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
+ setSourceStatistics(centersSource, 1024*1024, 32f);
+
+ OptimizedPlan plan = compileWithStats(p);
+ checkPlan(plan);
+ }
+
+ @Test
+ public void testCompileKMeansSingleStepWithOutStats() {
+
+ KMeansSingleStep kmi = new KMeansSingleStep();
+ Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+ p.setExecutionConfig(new ExecutionConfig());
+ OptimizedPlan plan = compileNoStats(p);
+ checkPlan(plan);
+ }
+
+
+ private void checkPlan(OptimizedPlan plan) {
+
+ OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+
+ final SinkPlanNode sink = or.getNode(SINK);
+ final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
+ final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+ final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
+
+ // check the mapper
+ assertEquals(1, mapper.getBroadcastInputs().size());
+ assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
+
+ assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
+ assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
+
+ assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+
+ assertNull(mapper.getInput().getLocalStrategyKeys());
+ assertNull(mapper.getInput().getLocalStrategySortOrder());
+ assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
+ assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
+
+
+ // check the combiner
+ Assert.assertNotNull(combiner);
+ assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+ assertNull(combiner.getInput().getLocalStrategyKeys());
+ assertNull(combiner.getInput().getLocalStrategySortOrder());
+ assertEquals(set0, combiner.getKeys(0));
+ assertEquals(set0, combiner.getKeys(1));
+
+ // check the reducer
+ assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+ assertEquals(set0, reducer.getKeys(0));
+ assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
+ assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+
+ // check the sink
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+ }
+}