You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:18 UTC
[4/7] flink git commit: [tests] Rename 'compiler' tests to
'optimizer' tests for consistent naming
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
new file mode 100644
index 0000000..f58486b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests TPCH Q3 (simplified) under various input conditions.
+ */
+@SuppressWarnings("deprecation")
+public class RelationalQueryCompilerTest extends CompilerTestBase {
+
+ private static final String ORDERS = "Orders";
+ private static final String LINEITEM = "LineItems";
+ private static final String MAPPER_NAME = "FilterO";
+ private static final String JOIN_NAME = "JoinLiO";
+
+ private final FieldList set0 = new FieldList(0);
+ private final FieldList set01 = new FieldList(new int[] {0,1});
+ private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
+
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * Verifies that a robust repartitioning plan with a hash join is created in the absence of statistics.
+ */
+ @Test
+ public void testQueryNoStatistics() {
+ try {
+ TPCHQuery3 query = new TPCHQuery3();
+ Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ p.setExecutionConfig(defaultExecutionConfig);
+ // compile
+ final OptimizedPlan plan = compileNoStats(p);
+
+ final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+
+ // get the nodes from the final plan
+ final SinkPlanNode sink = or.getNode("Output");
+ final SingleInputPlanNode reducer = or.getNode("AggLio");
+ final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
+ (SingleInputPlanNode) reducer.getPredecessor() : null;
+ final DualInputPlanNode join = or.getNode("JoinLiO");
+ final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+
+ // verify the optimizer choices
+ checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
+ Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner));
+ Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, false));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Checks if any valid plan is produced. Hash joins are expected to build the orders side, as the statistics
+ * indicate this to be the smaller one.
+ */
+ @Test
+ public void testQueryAnyValidPlan() {
+ testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
+ }
+
+ /**
+ * Verifies that the plan compiles in the presence of empty size=0 estimates.
+ */
+ @Test
+ public void testQueryWithSizeZeroInputs() {
+ testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
+ }
+
+ /**
+ * Statistics that push towards a broadcast join.
+ */
+ @Test
+ public void testQueryWithStatsForBroadcastHash() {
+ testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
+ }
+
+ /**
+ * Statistics that push towards a broadcast join.
+ */
+ @Test
+ public void testQueryWithStatsForRepartitionAny() {
+ testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
+ }
+
+ /**
+ * Statistics that push towards a repartition merge join. If the join blows the data volume up significantly,
+ * re-exploiting the sorted order is cheaper.
+ */
+ @Test
+ public void testQueryWithStatsForRepartitionMerge() {
+ TPCHQuery3 query = new TPCHQuery3();
+ Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ p.setExecutionConfig(defaultExecutionConfig);
+ // set compiler hints
+ OperatorResolver cr = getContractResolver(p);
+ JoinOperator match = cr.getNode("JoinLiO");
+ match.getCompilerHints().setFilterFactor(100f);
+
+ testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void testQueryGeneric(long orderSize, long lineItemSize,
+ float ordersFilterFactor,
+ boolean broadcastOkay, boolean partitionedOkay,
+ boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+ {
+ testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
+ }
+
+ private void testQueryGeneric(long orderSize, long lineItemSize,
+ float ordersFilterFactor, float joinFilterFactor,
+ boolean broadcastOkay, boolean partitionedOkay,
+ boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+ {
+ TPCHQuery3 query = new TPCHQuery3();
+ Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ p.setExecutionConfig(defaultExecutionConfig);
+ testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
+ }
+
+ private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
+ float orderSelectivity, float joinSelectivity,
+ boolean broadcastOkay, boolean partitionedOkay,
+ boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+ {
+ try {
+ // set statistics
+ OperatorResolver cr = getContractResolver(p);
+ FileDataSource ordersSource = cr.getNode(ORDERS);
+ FileDataSource lineItemSource = cr.getNode(LINEITEM);
+ MapOperator mapper = cr.getNode(MAPPER_NAME);
+ JoinOperator joiner = cr.getNode(JOIN_NAME);
+ setSourceStatistics(ordersSource, orderSize, 100f);
+ setSourceStatistics(lineItemSource, lineitemSize, 140f);
+ mapper.getCompilerHints().setAvgOutputRecordSize(16f);
+ mapper.getCompilerHints().setFilterFactor(orderSelectivity);
+ joiner.getCompilerHints().setFilterFactor(joinSelectivity);
+
+ // compile
+ final OptimizedPlan plan = compileWithStats(p);
+ final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+
+ // get the nodes from the final plan
+ final SinkPlanNode sink = or.getNode("Output");
+ final SingleInputPlanNode reducer = or.getNode("AggLio");
+ final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
+ (SingleInputPlanNode) reducer.getPredecessor() : null;
+ final DualInputPlanNode join = or.getNode("JoinLiO");
+ final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+
+ checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
+
+ // check the possible variants and that the variant ia allowed in this specific setting
+ if (checkBroadcastShipStrategies(join, reducer, combiner)) {
+ Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
+
+ if (checkHashJoinStrategies(join, reducer, true)) {
+ Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
+ } else if (checkHashJoinStrategies(join, reducer, false)) {
+ Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
+ } else if (checkBroadcastMergeJoin(join, reducer)) {
+ Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
+ } else {
+ Assert.fail("Plan has no correct hash join or merge join strategies.");
+ }
+ }
+ else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
+ Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
+
+ if (checkHashJoinStrategies(join, reducer, true)) {
+ Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
+ } else if (checkHashJoinStrategies(join, reducer, false)) {
+ Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
+ } else if (checkRepartitionMergeJoin(join, reducer)) {
+ Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
+ } else {
+ Assert.fail("Plan has no correct hash join or merge join strategies.");
+ }
+ } else {
+ Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Checks for special conditions
+ // ------------------------------------------------------------------------
+
+ private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
+ SingleInputPlanNode reducer, SinkPlanNode sink)
+ {
+ // check ship strategies that are always fix
+ Assert.assertEquals(ShipStrategyType.FORWARD, map.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ // check the driver strategies that are always fix
+ Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
+ if (combiner != null) {
+ Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+ }
+ }
+
+ private boolean checkBroadcastShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
+ SingleInputPlanNode combiner)
+ {
+ if (ShipStrategyType.BROADCAST == join.getInput1().getShipStrategy() &&
+ ShipStrategyType.FORWARD == join.getInput2().getShipStrategy() &&
+ ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy())
+ {
+ // check combiner
+ Assert.assertNotNull("Plan should have a combiner", combiner);
+ Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean checkRepartitionShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
+ SingleInputPlanNode combiner)
+ {
+ if (ShipStrategyType.PARTITION_HASH == join.getInput1().getShipStrategy() &&
+ ShipStrategyType.PARTITION_HASH == join.getInput2().getShipStrategy() &&
+ ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy())
+ {
+ // check combiner
+ Assert.assertNull("Plan should not have a combiner", combiner);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean checkHashJoinStrategies(DualInputPlanNode join, SingleInputPlanNode reducer, boolean buildFirst) {
+ if ( (buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
+ (!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy()) )
+ {
+ // driver keys
+ Assert.assertEquals(set0, join.getKeysForInput1());
+ Assert.assertEquals(set0, join.getKeysForInput2());
+
+ // local strategies
+ Assert.assertEquals(LocalStrategy.NONE, join.getInput1().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, join.getInput2().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+
+ // local strategy keys
+ Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
+ if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+ // driver keys
+ Assert.assertEquals(set0, join.getKeysForInput1());
+ Assert.assertEquals(set0, join.getKeysForInput2());
+
+ // local strategies
+ Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+
+ // local strategy keys
+ Assert.assertEquals(set0, join.getInput1().getLocalStrategyKeys());
+ Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
+ Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
+ Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
+ if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+ // driver keys
+ Assert.assertEquals(set0, join.getKeysForInput1());
+ Assert.assertEquals(set0, join.getKeysForInput2());
+
+ // local strategies
+ Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, reducer.getInput().getLocalStrategy());
+
+ // local strategy keys
+ Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
+ Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
+ Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
new file mode 100644
index 0000000..ce71383
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.distributions.SimpleDistribution;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.io.TextInputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
+import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class WordCountCompilerTest extends CompilerTestBase {
+
+ /**
+ * This method tests the simple word count.
+ */
+ @Test
+ public void testWordCount() {
+ checkWordCount(true);
+ checkWordCount(false);
+ }
+
+ private void checkWordCount(boolean estimates) {
+ try {
+ WordCount wc = new WordCount();
+ ExecutionConfig ec = new ExecutionConfig();
+ Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
+ p.setExecutionConfig(ec);
+
+ OptimizedPlan plan;
+ if (estimates) {
+ FileDataSource source = getContractResolver(p).getNode("Input Lines");
+ setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
+ plan = compileWithStats(p);
+ } else {
+ plan = compileNoStats(p);
+ }
+
+ // get the optimizer plan nodes
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
+ SinkPlanNode sink = resolver.getNode("Word Counts");
+ SingleInputPlanNode reducer = resolver.getNode("Count Words");
+ SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
+
+ // verify the strategies
+ Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ Channel c = reducer.getInput();
+ Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
+ FieldList l = new FieldList(0);
+ Assert.assertEquals(l, c.getShipStrategyKeys());
+ Assert.assertEquals(l, c.getLocalStrategyKeys());
+ Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+
+ // check the combiner
+ SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+ Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+ Assert.assertEquals(l, combiner.getKeys(0));
+ Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * This method tests that with word count and a range partitioned sink, the range partitioner is pushed down.
+ */
+ @Test
+ public void testWordCountWithSortedSink() {
+ checkWordCountWithSortedSink(true);
+ checkWordCountWithSortedSink(false);
+ }
+
+ private void checkWordCountWithSortedSink(boolean estimates) {
+ try {
+ FileDataSource sourceNode = new FileDataSource(new TextInputFormat(), IN_FILE, "Input Lines");
+ MapOperator mapNode = MapOperator.builder(new TokenizeLine())
+ .input(sourceNode)
+ .name("Tokenize Lines")
+ .build();
+ ReduceOperator reduceNode = ReduceOperator.builder(new CountWords(), StringValue.class, 0)
+ .input(mapNode)
+ .name("Count Words")
+ .build();
+ FileDataSink out = new FileDataSink(new CsvOutputFormat(), OUT_FILE, reduceNode, "Word Counts");
+ CsvOutputFormat.configureRecordFormat(out)
+ .recordDelimiter('\n')
+ .fieldDelimiter(' ')
+ .lenient(true)
+ .field(StringValue.class, 0)
+ .field(IntValue.class, 1);
+
+ Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
+ out.setGlobalOrder(ordering, new SimpleDistribution(new StringValue[] {new StringValue("N")}));
+
+ ExecutionConfig ec = new ExecutionConfig();
+ Plan p = new Plan(out, "WordCount Example");
+ p.setDefaultParallelism(DEFAULT_PARALLELISM);
+ p.setExecutionConfig(ec);
+
+ OptimizedPlan plan;
+ if (estimates) {
+ setSourceStatistics(sourceNode, 1024*1024*1024*1024L, 24f);
+ plan = compileWithStats(p);
+ } else {
+ plan = compileNoStats(p);
+ }
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
+ SinkPlanNode sink = resolver.getNode("Word Counts");
+ SingleInputPlanNode reducer = resolver.getNode("Count Words");
+ SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
+
+ Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.PARTITION_RANGE, reducer.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ Channel c = reducer.getInput();
+ Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
+ FieldList l = new FieldList(0);
+ Assert.assertEquals(l, c.getShipStrategyKeys());
+ Assert.assertEquals(l, c.getLocalStrategyKeys());
+
+ // check that the sort orders are descending
+ Assert.assertFalse(c.getShipStrategySortOrder()[0]);
+ Assert.assertFalse(c.getLocalStrategySortOrder()[0]);
+
+ // check the combiner
+ SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+ Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+ Assert.assertEquals(l, combiner.getKeys(0));
+ Assert.assertEquals(l, combiner.getKeys(1));
+ Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
new file mode 100644
index 0000000..de5fde0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
+
+ private static final String VERTEX_SOURCE = "Vertices";
+
+ private static final String ITERATION_NAME = "Connected Components Iteration";
+
+ private static final String EDGES_SOURCE = "Edges";
+ private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
+ private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
+
+ private static final String SINK = "Result";
+
+ private static final boolean PRINT_PLAN = false;
+
+ private final FieldList set0 = new FieldList(0);
+
+
+ @Test
+ public void testWorksetConnectedComponents() {
+ ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
+
+ Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
+ IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
+ plan.setExecutionConfig(new ExecutionConfig());
+ OptimizedPlan optPlan = compileNoStats(plan);
+ OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
+
+ if (PRINT_PLAN) {
+ PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+ String json = dumper.getOptimizerPlanAsJSON(optPlan);
+ System.out.println(json);
+ }
+
+ SourcePlanNode vertexSource = or.getNode(VERTEX_SOURCE);
+ SourcePlanNode edgesSource = or.getNode(EDGES_SOURCE);
+ SinkPlanNode sink = or.getNode(SINK);
+ WorksetIterationPlanNode iter = or.getNode(ITERATION_NAME);
+
+ DualInputPlanNode neighborsJoin = or.getNode(JOIN_NEIGHBORS_MATCH);
+ DualInputPlanNode cogroup = or.getNode(MIN_ID_AND_UPDATE);
+
+ // --------------------------------------------------------------------
+ // Plan validation:
+ //
+ // We expect the plan to go with a sort-merge join, because the CoGroup
+ // sorts and the join in the successive iteration can re-exploit the sorting.
+ // --------------------------------------------------------------------
+
+ // test all drivers
+ Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
+
+ Assert.assertEquals(DriverStrategy.MERGE, neighborsJoin.getDriverStrategy());
+ Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
+ Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
+
+ Assert.assertEquals(DriverStrategy.CO_GROUP, cogroup.getDriverStrategy());
+ Assert.assertEquals(set0, cogroup.getKeysForInput1());
+ Assert.assertEquals(set0, cogroup.getKeysForInput2());
+
+ // test all the shipping strategies
+ Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialSolutionSetInput().getShipStrategy());
+ Assert.assertEquals(set0, iter.getInitialSolutionSetInput().getShipStrategyKeys());
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialWorksetInput().getShipStrategy());
+ Assert.assertEquals(set0, iter.getInitialWorksetInput().getShipStrategyKeys());
+
+ Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
+ Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
+ Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
+
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, cogroup.getInput1().getShipStrategy()); // min id
+ Assert.assertEquals(ShipStrategyType.FORWARD, cogroup.getInput2().getShipStrategy()); // solution set
+
+ // test all the local strategies
+ Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
+
+ // the sort for the neighbor join in the first iteration is pushed out of the loop
+ Assert.assertEquals(LocalStrategy.SORT, iter.getInitialWorksetInput().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
+ Assert.assertEquals(LocalStrategy.SORT, neighborsJoin.getInput2().getLocalStrategy()); // edges
+
+ Assert.assertEquals(LocalStrategy.SORT, cogroup.getInput1().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, cogroup.getInput2().getLocalStrategy()); // solution set
+
+ // check the caches
+ Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
+
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ jgg.compileJobGraph(optPlan);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
new file mode 100644
index 0000000..3785270
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class IterativeKMeansTest extends CompilerTestBase {
+
+ private static final String DATAPOINTS = "Data Points";
+ private static final String CENTERS = "Centers";
+
+ private static final String MAPPER_NAME = "Find Nearest Centers";
+ private static final String REDUCER_NAME = "Recompute Center Positions";
+
+ private static final String ITERATION_NAME = "k-means loop";
+
+ private static final String SINK = "New Center Positions";
+
+ private final FieldList set0 = new FieldList(0);
+
+ // --------------------------------------------------------------------------------------------
+ // K-Means (Bulk Iteration)
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testCompileKMeansSingleStepWithStats() {
+
+ KMeansBroadcast kmi = new KMeansBroadcast();
+ Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+ p.setExecutionConfig(new ExecutionConfig());
+ // set the statistics
+ OperatorResolver cr = getContractResolver(p);
+ FileDataSource pointsSource = cr.getNode(DATAPOINTS);
+ FileDataSource centersSource = cr.getNode(CENTERS);
+ setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
+ setSourceStatistics(centersSource, 1024*1024, 32f);
+
+ OptimizedPlan plan = compileWithStats(p);
+ checkPlan(plan);
+
+ new JobGraphGenerator().compileJobGraph(plan);
+ }
+
+ @Test
+ public void testCompileKMeansSingleStepWithOutStats() {
+
+ KMeansBroadcast kmi = new KMeansBroadcast();
+ Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+ p.setExecutionConfig(new ExecutionConfig());
+ OptimizedPlan plan = compileNoStats(p);
+ checkPlan(plan);
+
+ new JobGraphGenerator().compileJobGraph(plan);
+ }
+
+ private void checkPlan(OptimizedPlan plan) {
+
+ OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+
+ final SinkPlanNode sink = or.getNode(SINK);
+ final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
+ final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+ final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
+
+ final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
+
+ // -------------------- outside the loop -----------------------
+
+ // check the sink
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+
+ // check the iteration
+ assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
+
+
+ // -------------------- inside the loop -----------------------
+
+ // check the mapper
+ assertEquals(1, mapper.getBroadcastInputs().size());
+ assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
+ assertFalse(mapper.getInput().isOnDynamicPath());
+ assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
+ assertTrue(mapper.getInput().getTempMode().isCached());
+
+ assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
+ assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
+
+ assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+
+ assertNull(mapper.getInput().getLocalStrategyKeys());
+ assertNull(mapper.getInput().getLocalStrategySortOrder());
+ assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
+ assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
+
+ // check the combiner
+ Assert.assertNotNull(combiner);
+ assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+ assertTrue(combiner.getInput().isOnDynamicPath());
+
+ assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+ assertNull(combiner.getInput().getLocalStrategyKeys());
+ assertNull(combiner.getInput().getLocalStrategySortOrder());
+ assertEquals(set0, combiner.getKeys(0));
+ assertEquals(set0, combiner.getKeys(1));
+
+ // check the reducer
+ assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+ assertTrue(reducer.getInput().isOnDynamicPath());
+ assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+ assertEquals(set0, reducer.getKeys(0));
+ assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
+ assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
new file mode 100644
index 0000000..f17b28a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+
+@SuppressWarnings("serial")
+public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
+
+ private static final String JOIN_1 = "join1";
+ private static final String JOIN_2 = "join2";
+
+ @Test
+ public void testMultiSolutionSetJoinPlan() {
+ try {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Double>> inputData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0));
+ DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
+
+ // add two sinks, to test the case of branching after an iteration
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+
+ Plan p = env.createProgramPlan();
+
+ OptimizedPlan optPlan = compileNoStats(p);
+
+ OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
+
+ DualInputPlanNode join1 = or.getNode(JOIN_1);
+ DualInputPlanNode join2 = or.getNode(JOIN_2);
+
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, join1.getDriverStrategy());
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, join2.getDriverStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, join1.getInput2().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, join2.getInput1().getShipStrategy());
+
+ assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass());
+ assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass());
+
+ new JobGraphGenerator().compileJobGraph(optPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test erroneous: " + e.getMessage());
+ }
+ }
+
+
+
+ public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
+
+ DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
+
+ DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
+ .join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
+ .groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
+ .join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
+
+ DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+ DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
+
+ return result;
+ }
+
+ public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ @Override
+ public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
+ return new Tuple2<Long, Double>(first.f0, first.f1 + second.f1);
+ }
+ }
+
+ public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ @Override
+ public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
+ return new Tuple2<Long, Double>(first.f0, first.f1 + first.f2 + second.f1);
+ }
+ }
+
+ public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ @Override
+ public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
+ out.collect(value);
+ out.collect(value);
+ }
+ }
+
+ public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
+
+ @Override
+ public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {
+ return new Tuple3<Long, Double, Double>(value.f0, value.f1, value.f1 * 2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
new file mode 100644
index 0000000..9b17270
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
+import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
+import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
+import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
+import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+public class PageRankCompilerTest extends CompilerTestBase{
+
+ @Test
+ public void testPageRank() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<Long> pagesInput = env.fromElements(1l);
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
+
+ // assign initial rank to pages
+ DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+ map(new RankAssigner((1.0d / 10)));
+
+ // build adjacency list from link input
+ DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+ linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+
+ // set iterative data set
+ IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+
+ DataSet<Tuple2<Long, Double>> newRanks = iteration
+ // join pages with outgoing edges and distribute rank
+ .join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
+ .flatMap(new JoinVertexWithEdgesMatch())
+ // collect and sum ranks
+ .groupBy(0).aggregate(SUM, 1)
+ // apply dampening factor
+ .map(new Dampener(0.85, 10));
+
+ DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+ newRanks,
+ newRanks.join(iteration).where(0).equalTo(0)
+ // termination condition
+ .filter(new EpsilonFilter()));
+
+ finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+
+ // get the plan and compile it
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
+ BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
+
+ // check that the partitioning is pushed out of the first loop
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
+
+ BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
+ Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
new file mode 100644
index 0000000..082532e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.jsonplan;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.Client.ProgramAbortException;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+/*
+ * The tests in this class simply invokes the JSON dump code for the optimized plan.
+ */
+public class DumpCompiledPlanTest extends CompilerTestBase {
+
+ @Test
+ public void dumpWordCount() {
+ dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+ }
+
+ @Test
+ public void dumpTPCH3() {
+ dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+ }
+
+ @Test
+ public void dumpKMeans() {
+ dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+ }
+
+ @Test
+ public void dumpIterativeKMeans() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ // <points path> <centers path> <result path> <num iterations
+ KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("KMeans failed with an exception");
+ }
+ dump(env.getPlan());
+ }
+
+ @Test
+ public void dumpWebLogAnalysis() {
+ dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+ }
+
+ @Test
+ public void dumpBulkIterationKMeans() {
+ dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+ }
+
+ @Test
+ public void dumpDeltaPageRank() {
+ dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+ }
+
+ private void dump(Plan p) {
+ p.setExecutionConfig(new ExecutionConfig());
+ try {
+ OptimizedPlan op = compileNoStats(p);
+ PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+ String json = dumper.getOptimizerPlanAsJSON(op);
+ JsonParser parser = new JsonFactory().createJsonParser(json);
+ while (parser.nextToken() != null);
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An error occurred in the test: " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
new file mode 100644
index 0000000..49fe6d8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.jsonplan;
+
+import java.util.List;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.apache.flink.util.OperatingSystem;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+/*
+ * The tests in this class simply invokes the JSON dump code for the original plan.
+ */
+public class PreviewPlanDumpTest {
+
+ protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/test/file" : "file:///test/file";
+
+ protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/test/output" : "file:///test/output";
+
+ protected static final String[] NO_ARGS = new String[0];
+
+ @Test
+ public void dumpWordCount() {
+ dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
+
+ // The web interface passes empty string-args to compute the preview of the
+ // job, so we should test this situation too
+ dump(new WordCount().getPlan(NO_ARGS));
+ }
+
+ @Test
+ public void dumpTPCH3() {
+ dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
+ dump(new TPCHQuery3().getPlan(NO_ARGS));
+ }
+
+ @Test
+ public void dumpKMeans() {
+ dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
+ dump(new KMeansSingleStep().getPlan(NO_ARGS));
+ }
+
+ @Test
+ public void dumpWebLogAnalysis() {
+ dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+ dump(new WebLogAnalysis().getPlan(NO_ARGS));
+ }
+
+ @Test
+ public void dumpBulkIterationKMeans() {
+ dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
+ dump(new KMeansBroadcast().getPlan(NO_ARGS));
+ }
+
+ @Test
+ public void dumpDeltaPageRank() {
+ dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+ dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
+ }
+
+ private void dump(Plan p) {
+ try {
+ List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
+ PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+ String json = dumper.getPactPlanAsJSON(sinks);
+ JsonParser parser = new JsonFactory().createJsonParser(json);
+ while (parser.nextToken() != null);
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An error occurred in the test: " + e.getMessage());
+ }
+ }
+}